Features

Realtime TypeScript SDK v3.32.0+ Go SDK v0.9.0+

Realtime is currently in developer preview. Some details including APIs are still subject to change during this period. Read more about the developer preview here.

Realtime allows you to stream data from workflows to your users without configuring infrastructure or maintaining state. This enables you to build interactive applications, show status updates, or stream AI responses to your users directly in your existing code.

Usage

There are two core parts of Realtime: publishing and subscribing. You must publish data from your functions for it to be visible. Publishing data accepts three parameters:

  • data, the data to be published to the realtime stream
  • channel, the name for a group of topics, e.g., user:123
  • topic, the (optionally typed and validated) topic name, allowing you to differentiate between types of data

Subscriptions receive data by subscribing to topics within a channel. We manage the WebSocket connections, publishing, subscribing, and state for you.

Getting started

To use realtime, start by installing the @inngest/realtime package:

npm install @inngest/realtime

Publishing

Using our APIs, you publish specific data that users can subscribe to. Here's a basic example:

import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";

const inngest = new Inngest({
  id: "my-app",
  // Whenever you create your app, include the `realtimeMiddleware()`
  middleware: [realtimeMiddleware()],
});

// create a channel for each user, given a user ID. A channel is a namespace for one or more topics of streams.
const userChannel = channel((userId: string) => `user:${userId}`)
  // Add a specific topic, eg. "ai" for all AI data within the user's channel
  .addTopic(
    topic("ai").schema(
      z.object({
        response: z.string(),
        // Transforms are supported for realtime data
        success: z.number().transform(Boolean),
      })
    )
  );

// we can also create global channels that do not require input
const logsChannel = channel("logs").addTopic(topic("info").type<string>());

inngest.createFunction(
  { id: "some-task" },
  { event: "ai/ai.requested" },
  async ({ event, step, publish }) => {
    // Publish data to the given channel, on the given topic.
    await publish(
      userChannel(event.data.userId).ai({
        response: "an llm response here",
        success: true,
      })
    );

    await publish(logsChannel().info("All went well"));
  }
);

Subscribing

Subscribing can be done using an Inngest client that either has a valid signing key or a subscription token. You can subscribe from the client or the backend.

Subscribe from the client

1

Create a subscription token

Subscription tokens should be created on the server and passed to the client. You can create a new endpoint to generate a token, checking things like user permissions or channel subscriptions.

Here's an example of a server endpoint that creates a token, scoped to a user's channel and specific topics.

// ex. /app/actions/get-subscribe-token.ts
"use server";

import { getInngestApp } from "@/inngest";
import { helloChannel } from "@/inngest/functions/helloWorld";
import { getSubscriptionToken, Realtime } from "@inngest/realtime";

export type HelloToken = Realtime.Token<typeof helloChannel, ["logs"]>;

export async function fetchRealtimeSubscriptionToken(): Promise<HelloToken> {
  const token = await getSubscriptionToken(getInngestApp(), {
    channel: helloChannel(),
    topics: ["logs"],
  });

  return token;
}
2

Subscribe to a channel

Once you have a token, you can subscribe to a channel by calling the subscribe function with the token. You can also subscribe using the useInngestSubscription React hook. Read more about the React hook here.

// ex: ./app/page.tsx
"use client";

import { useInngestSubscription } from "@inngest/realtime/hooks";
import { useState } from "react";
import { fetchRealtimeSubscriptionToken } from "./actions";

export default function Home() {
  const { data, error, freshData, state, latestData } = useInngestSubscription({
    refreshToken: fetchRealtimeSubscriptionToken,
  });

  return (
    <div>
      {data.map((message, i) => (
        <div key={i}>{message.data}</div>
      ))}
    </div>
  );
}

Both Next.js Server Actions and subscribe() approaches offer a fully typed experience.

The Next.js Server Action relies on the Realtime.Token<> type helper to get a typed token.

The subscribe() approach uses the typeOnlyChannel() helper to get a typed channel.

That's all you need to do to subscribe to a channel from the client!

Subscribe from the backend

Subscribing on the backend is simple:

import { subscribe } from "@inngest/realtime";
import { userChannel } from "./channels";

const stream = await subscribe({
  channel: userChannel("123"),
  topics: ["ai"], // subscribe to one or more topics in the user channel
});

// The returned `stream` from `subscribe()` is a `ReadableStream` that can be
// used with `getReader()`

// Example 1: ReadableStream
// Example 1: ReadableStream
const reader = stream.getReader();
const { done, value } = await reader.read();
if (!done) {
  console.log(value); // `value` is fully typed
}

// Example 2: Convert to an async iterator to enable for await loops
async function* streamAsyncIterator<T>(stream: ReadableStream<T>): AsyncGenerator<T> {
  // Get a lock on the stream
  const reader = stream.getReader();

  try {
    while (true) {
      // Read from the stream
      const { done, value } = await reader.read();
      // Exit if we're done
      if (done) return;
      // Else yield the chunk
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

for await (const message of streamAsyncIterator(stream)) {
  console.log(message); // `message` is fully typed
}

Concepts

Channels

Channels are environment-level containers that group one or more topics of data. You can create as many channels as you need. Some tips:

  • You can subscribe to a channel before any data is published
  • You can create a channel for a specific run ID, e.g., for a run's status: run:${ctx.runId}
  • You can create channels for each user, or for a given conversation

Topics

Topics allow you to specify individual streams within a channel. For example, within a given run you may publish status updates, AI responses, and tool outputs to a user.

Benefits of separating data by topics include:

  • Typing and data handling: You can switch on the topic name to properly type and handle different streams of data within a channel.
  • Security: You must specify topics when creating subscription tokens, allowing you to protect or hide specific published data.

Subscription Tokens

Subscription tokens allow you to subscribe to the specified channel's topics. Tokens expire 1 minute after creation for security purposes. Once connected, you do not need to manage authentication or re-issue tokens to keep the connection active.

SDK Support

Realtime is supported in the following SDKs:

SDKPublishSubscribeVersion
TypeScript>=v3.32.0
Golang>=v0.9.0
PythonIn progressIn progress-

Limitations

  • The number of currently active topics depends on your Inngest plan
  • Data sent is currently at-most-once and ephemeral
  • The max message size is currently 512KB

Developer preview

Realtime is available as a developer preview. During this period:

  • This feature is widely available for all Inngest accounts.
  • Some details including APIs and SDKs are subject to change based on user feedback.
  • There is no additional cost to using realtime. Realtime will be available to all Inngest billing plans at general availability, but final pricing is not yet determined.

Security

Realtime is secure by default. You can only subscribe to a channel's topics using time-sensitive tokens. The subscription token mechanism must be placed within your own protected API endpoints.

You must always specify the channel and topics when publishing data. This ensures that users can only access specific subsets of data within runs.

Delivery guarantees

Message delivery is currently at-most-once. We recommend that your users subscribe to a channel's topics as you invoke runs or send events to ensure delivery of data within a topic.