Realtime
The v4 SDK includes a built-in realtime system for streaming updates from your Inngest functions to client applications. There's no separate package to install: channels, publishing, and subscriptions are all part of inngest.
Realtime is built into the v4 SDK. If you're using v3, see the v3 realtime docs.
Key concepts
- Channels define a named scope for messages. They can be static or parameterized with runtime values like a
runId. - Topics are typed message streams within a channel, each with a schema.
- Publishing sends data to a topic with
publish()in a function handler,inngest.realtime.publish()from server-side code, orstep.realtime.publish()for durable publishes. - Subscribing consumes messages via
useRealtime,subscribe(), or the client aliasesinngest.realtime.subscribe()andinngest.realtime.token().
As a rule of thumb, prefer step.realtime.publish() for publishes inside
functions. It is memoized and durable, unlike publish() and
inngest.realtime.publish(), which will fire again if the function retries.
Imports
| What | Import from |
|---|---|
realtime, staticSchema | "inngest" |
useRealtime, getClientSubscriptionToken | "inngest/react" |
subscribe, getSubscriptionToken | "inngest/realtime" |
publish, step.realtime.publish | Function handler context (no import needed) |
inngest.realtime.publish, inngest.realtime.subscribe, inngest.realtime.token | Your Inngest client instance |
Quick start
1. Define a channel
src/inngest/channels.ts
import { realtime, staticSchema } from "inngest";
import { z } from "zod";
export const pipelineChannel = realtime.channel({
name: ({ runId }: { runId: string }) => `pipeline:${runId}`,
topics: {
status: {
schema: z.object({ message: z.string(), step: z.string().optional() }),
},
tokens: {
schema: staticSchema<{ token: string }>(),
},
},
});
2. Publish from a function
src/inngest/functions.ts
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
export const generate = inngest.createFunction(
{ id: "generate", triggers: [{ event: "app/generate" }] },
async ({ event, publish, step }) => {
const ch = pipelineChannel({ runId: event.data.runId });
//
// Non-durable on purpose. Use this only when replay on retry is OK.
await publish(ch.status, { message: "Starting..." });
await step.run("stream-output", async () => {
for (const token of ["Hello", " ", "world"]) {
//
// Non-durable on purpose. Token streams may replay on retry.
await publish(ch.tokens, { token });
}
});
//
// Prefer the durable publish for important state changes.
await step.realtime.publish("final-status", ch.status, {
message: "Done!",
step: "complete",
});
},
);
3. Subscribe from React
src/app/page.tsx
"use client";
import { useRealtime } from "inngest/react";
import { pipelineChannel } from "../inngest/channels";
export default function PipelinePage({ runId }: { runId: string }) {
const ch = pipelineChannel({ runId });
const topics = ["status", "tokens"] as const;
const { connectionStatus, runStatus, messages } = useRealtime({
channel: ch,
topics,
token: () =>
fetch(`/api/realtime-token?runId=${runId}`).then((res) => res.text()),
});
return (
<div>
<p>Connection: {connectionStatus} | Run: {runStatus}</p>
{messages.byTopic.status && (
<p>Status: {messages.byTopic.status.data.message}</p>
)}
<ul>
{messages.all.map((msg, i) => (
<li key={i}>[{msg.topic}] {JSON.stringify(msg.data)}</li>
))}
</ul>
</div>
);
}
Next steps
- Channels & topics: defining channels, parameterized names, schemas, and type inference
- Publishing:
publish(),inngest.realtime.publish(), andstep.realtime.publish() useRealtime: full React hook API reference- Subscribing:
getClientSubscriptionToken(),subscribe(), and client aliases