Featured image for Developer Preview: Realtime blog post

Developer Preview: Realtime

Charly Poly· 5/21/2025 · 6 min read

We are thrilled to announce a new developer preview: Realtime.

This feature allows you to stream updates from your Inngest functions to your users using a new publish() method available in the Inngest TypeScript and Go SDKs.

Realtime updates are now a core requirement for any web application, especially when long-running tasks are involved. The rise of AI patterns such as AI Agents has further emphasized the need for such features among the many developers building AI applications with Inngest.

With Realtime, you can keep your users informed of a specific task's progress (e.g., a long-running AI Agent execution), or even of multiple tasks at once (e.g., a workflow involving multiple Inngest functions). Enable bi-directional communication between your users and your Inngest functions by combining Realtime with waitForEvent() (e.g., a Human in the Loop).

publish(): a flexible and fully typed Realtime API

As with all features in the Inngest TypeScript SDK, publish() is fully type-safe and serves as a building block for multiple use cases.

The subscriber/publisher communication is built around typed channels, which encompass optional topics for more fine-grained control:

src/inngest/functions/hello-world.ts
import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
// Whenever you create your app, include the `realtimeMiddleware()`
middleware: [realtimeMiddleware()],
});
export const helloChannel = channel("hello-world").addTopic(
topic("logs").type<string>()
);
export const someTask = inngest.createFunction(
{ id: "hello-world" },
{ event: "hello-world/hello" },
async ({ event, step, publish }) => {
// Publish data to the given channel, on the given topic.
await publish(helloChannel().logs("Hello, world!"));
}
);

Then, subscribing to a channel is easily achieved by using the useInngestSubscription() hook from your React components:

src/actions.tsx
"use server";
// securely fetch an Inngest Realtime subscription token from the server as a server action
export async function fetchSubscriptionToken(): Promise<Realtime.Token<typeof helloChannel, ["logs"]>> {
const token = await getSubscriptionToken(getInngestApp(), {
channel: helloChannel(),
topics: ["logs"],
});
return token;
}
src/App.tsx
tsx
"use client";
import { useInngestSubscription } from "@inngest/realtime/hooks";
// import the server action to securely fetch the Realtime subscription token
import { fetchRealtimeSubscriptionToken } from "./actions";
export default function Home() {
// subscribe to the hello-world channel via the subscription token
// `data` is fully typed based on the selected channel and topics!
const { data, error } = useInngestSubscription({
refreshToken: fetchRealtimeSubscriptionToken,
});
return (
<div>
<h1>Realtime</h1>
{data.map((message, i) => (
<div key={i}>{message.data}</div>
))}
</div>
)
}

Realtime can also be subscribed directly from the server using the subscribe() function.

Realtime is built upon Inngest's protocol, ensuring secure, low-latency and at-most-once delivery of messages. Its combination of channels and topics enables many use cases, let's cover a few of them.

Realtime in practice

Stream updates from a single Inngest function run

Enabling your users to follow the progress of a specific long-running task is a common use case. Achieving this is done by providing a unique identifier to the event triggering function, later used to create a unique topic for the subscriber to subscribe to.

First, our backend will trigger our hello-world function, providing a unique identifier in the event payload while using it to subscribe to the dedicated run channel:

app/api/hello-world/route.ts
import crypto from "crypto";
import { inngest } from "@/inngest/client";
import { subscribe } from "@inngest/realtime";
export async function POST(req: Request) {
const json = await req.json();
const { prompt } = json;
const uuid = crypto.randomUUID();
await inngest.send({
name: "hello-world/hello",
data: {
uuid,
},
});
const stream = await subscribe(inngest, {
channel: `hello-world.${uuid}`, // subscribe to the dedicated run channel
topics: ["logs"],
});
return new Response(stream.getEncodedStream(), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}

Our hello-world function will then publish updates to the dedicated run channel using the provided unique identifier:

src/inngest/functions/hello-world.ts
import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
// Whenever you create your app, include the `realtimeMiddleware()`
middleware: [realtimeMiddleware()],
});
// Create a channel that will be used to publish updates to the dedicated run
export const helloChannel = channel((uuid: string) => `hello-world.${uuid}`).addTopic(
topic("logs").type<string>()
);
export const someTask = inngest.createFunction(
{ id: "hello-world" },
{ event: "hello-world/hello" },
async ({ event, step, publish }) => {
const { uuid } = event.data;
// Publish data to the dynamically created channel, on the given topic.
await publish(helloChannel(uuid).logs("Hello, world!"));
}
);

By creating a channel that accepts a parameter, we easily implemented run-specific channels, allowing users to get updates from a specific run.

Implementation of a Human in the loop

Combining Realtime with waitForEvent() enables bi-directional communication between your users and your Inngest functions, allowing you to implement a Human-in-the-Loop pattern and enable your users to review an ongoing AI workflow.

Here, our Agentic Workflow sends an update to the user and waits for their confirmation to proceed:

src/inngest/functions/agentic-workflow.ts
import crypto from "crypto";
import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
// Whenever you create your app, include the `realtimeMiddleware()`
middleware: [realtimeMiddleware()],
});
// Create a channel that will be used to publish updates to the dedicated run
export const agenticWorkflowChannel = channel("agentic-workflow").addTopic(
topic("messages").schema(
z.object({
message: z.string(),
confirmationUUid: z.string(),
})
)
);
export const agenticWorkflow = inngest.createFunction(
{ id: "agentic-workflow" },
{ event: "agentic-workflow/start" },
async ({ event, step, publish }) => {
await step.run(/* ... */)
// Generate a unique identifier for the confirmation
const confirmationUUid = await step.run("get-confirmation-uuid", async () => {
return crypto.randomUUID();
})
// Ask the user to confirm their choice
await publish(agenticWorkflowChannel().messages({
message: "Confirm to proceed?",
confirmationUUid,
}));
// Wait for the user to confirm their choice
const confirmation = await step.waitForEvent("wait-for-confirmation", {
event: "agentic-workflow/confirmation",
timeout: "15m",
// "async" is the "agentic-workflow/confirmation" event here:
if: `async.data.confirmationUUid == "${confirmationUUid}"`,
});
if (confirmation) {
// continue workflow
}
}
);

In the above example, the confirmationUUid is passed from the published message to the reply event, allowing you to identify a unique user confirmation.

You will find the complete source code of the above examples in the Realtime examples page.

Give us your feedback

Realtime is now available as a developer preview in the Inngest TypeScript and Go SDKs. Some details including APIs are still subject to change during this period. Read more about the developer preview here.

We are excited to see what you will build with Realtime and look forward to your feedback!