Realtime: Stream updates from Inngest functions
Inngest Realtime enables you to stream updates from your functions, power live UIs, and implement bi-directional workflows such as Human-in-the-Loop. Use channels and topics to broadcast updates, stream logs, or await user input.
Pattern: Stream updates from a single function run
Enable users to follow the progress of a long-running task by streaming updates from a dedicated channel. Here's how to trigger a function and subscribe to its updates:
import crypto from "crypto";
import { inngest } from "@/inngest/client";
import { subscribe } from "@inngest/realtime";
export async function POST(req: Request) {
const json = await req.json();
const { prompt } = json;
const uuid = crypto.randomUUID();
await inngest.send({
name: "hello-world/hello",
data: { uuid },
});
const stream = await subscribe(inngest, {
channel: `hello-world.${uuid}`,
topics: ["logs"],
});
return new Response(stream.getEncodedStream(), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Your function can then publish updates to this channel:
import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
middleware: [realtimeMiddleware()],
});
export const helloChannel = channel((uuid: string) => `hello-world.${uuid}`).addTopic(
topic("logs").type<string>()
);
export const someTask = inngest.createFunction(
{ id: "hello-world" },
{ event: "hello-world/hello" },
async ({ event, step, publish }) => {
const { uuid } = event.data;
await publish(helloChannel(uuid).logs("Hello, world!"));
}
);
By creating a channel with a unique identifier, you can stream updates for a specific run to the end user.
Pattern: Stream updates from multiple function runs
A Realtime channel can be used to stream updates from multiple function runs. Here, we'll define two channels: one global channel and one post-specific channel:
src/inngest/channels.ts
import {
channel,
topic,
} from "@inngest/realtime";
import { z } from "zod";
export const globalChannel = channel("global").addTopic(topic("logs").type<string>());
export const postChannel = channel((postId: string) => `post:${postId}`)
.addTopic(
topic("updated").schema(
z.object({
id: z.string(),
likes: z.number(),
})
)
)
.addTopic(
topic("deleted").schema(
z.object({
id: z.string(),
reason: z.string(),
})
)
);
Our likePost
function will publish updates to both channels:
src/inngest/functions/likePost.ts
import {
channel,
realtimeMiddleware,
subscribe,
topic,
} from "@inngest/realtime";
import { EventSchemas, Inngest } from "inngest";
import { globalChannel, postChannel } from "../channels";
export const likePost = app.createFunction(
{
id: "post/like",
retries: 0,
},
{
event: "app/post.like",
},
async ({
event: {
data: { postId = "123" },
},
step,
publish,
}) => {
if (!postId) {
await publish(
globalChannel().logs("Missing postId when trying to like post")
);
throw new Error("Missing postId");
}
await publish(globalChannel().logs(`Liking post ${postId}`));
// Fake a post update
const post = await step.run("Update likes", async () => {
const fakePost = {
id: "123",
likes: Math.floor(Math.random() * 10000),
};
return publish(postChannel(fakePost.id).updated(fakePost));
});
return post;
}
);
The globalChannel
will be used to stream updates for all posts, and the postChannel
will be used to stream updates for specific posts.
Human in the loop: Bi-directional workflows
Combine Realtime with waitForEvent()
to enable workflows that require user input, such as review or approval steps. Here's how to send a message to the user and wait for their confirmation:
import crypto from "crypto";
import { Inngest } from "inngest";
import { realtimeMiddleware, channel, topic } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
middleware: [realtimeMiddleware()],
});
export const agenticWorkflowChannel = channel("agentic-workflow").addTopic(
topic("messages").schema(
z.object({
message: z.string(),
confirmationUUid: z.string(),
})
)
);
export const agenticWorkflow = inngest.createFunction(
{ id: "agentic-workflow" },
{ event: "agentic-workflow/start" },
async ({ event, step, publish }) => {
await step.run(/* ... */);
const confirmationUUid = await step.run("get-confirmation-uuid", async () => crypto.randomUUID());
await publish(agenticWorkflowChannel().messages({
message: "Confirm to proceed?",
confirmationUUid,
}));
const confirmation = await step.waitForEvent("wait-for-confirmation", {
event: "agentic-workflow/confirmation",
timeout: "15m",
if: `async.data.confirmationUUid == \"${confirmationUUid}\"`,
});
if (confirmation) {
// continue workflow
}
}
);
The confirmationUUid
links the published message to the reply event, ensuring the correct user response is handled.