useRealtime
The useRealtime hook subscribes to realtime messages from Inngest functions in React components. It manages the WebSocket connection, reconnection, buffering, and provides typed access to messages by topic.
import { useRealtime } from "inngest/react";
import { pipelineChannel } from "../inngest/channels";
function Pipeline({ runId }: { runId: string }) {
const ch = pipelineChannel({ runId });
const topics = ["status", "tokens"] as const;
const { connectionStatus, runStatus, messages, result } = useRealtime({
channel: ch,
topics,
token: () =>
fetch(`/api/realtime-token?runId=${runId}`).then((r) => r.json()),
});
return (
<div>
<p>Connection: {connectionStatus} | Run: {runStatus}</p>
{messages.byTopic.status && (
<p>{messages.byTopic.status.data.message}</p>
)}
<p>Messages received: {messages.all.length}</p>
{result && <pre>{JSON.stringify(result, null, 2)}</pre>}
</div>
);
}
useRealtime(options)
- Name
channel- Type
- ChannelInstance | string
- Required
- optional
- Description
The channel to subscribe to. Can be a channel instance from
realtime.channel()or a plain string. When you use a channel instance, topic data is typed automatically.
- Name
topics- Type
- string[]
- Required
- optional
- Description
The topics to subscribe to within the channel. Required when your
tokenfactory returns only a token key string.
- Name
token- Type
- Token | () => Promise<string | ClientSubscriptionToken | Token>
- Required
- optional
- Description
Authentication for the subscription. Pass a pre-minted token object, or an async factory that returns a token key string, a
ClientSubscriptionTokenfromgetClientSubscriptionToken(), or a full token object. The async factory pattern is recommended because it runs on mount and on reconnect.
- Name
key- Type
- string
- Required
- optional
- Description
Optional subscription identity key. Change it to force the hook to reset its retained state and reconnect even if
channelandtopicsare unchanged.
- Name
enabled- Type
- boolean
- Required
- optional
- Description
Whether the subscription is active. Set to
falseto pause without unmounting. Defaults totrue.
- Name
validate- Type
- boolean
- Required
- optional
- Description
Enable subscriber-side schema validation on incoming messages. Defaults to
true.
- Name
historyLimit- Type
- number | null
- Required
- optional
- Description
Maximum number of messages to retain in
messages.all. Set tonullfor unbounded history. Defaults to100.
- Name
bufferInterval- Type
- number
- Required
- optional
- Description
Milliseconds to buffer incoming messages before triggering a re-render. Useful for high-frequency streams to reduce render pressure. Defaults to
0(immediate).
- Name
reconnect- Type
- boolean
- Required
- optional
- Description
Automatically reconnect on disconnect. Defaults to
true.
- Name
reconnectMinMs- Type
- number
- Required
- optional
- Description
Minimum delay between reconnect attempts in milliseconds. Defaults to
250.
- Name
reconnectMaxMs- Type
- number
- Required
- optional
- Description
Maximum delay between reconnect attempts in milliseconds (exponential backoff cap). Defaults to
5000.
- Name
pauseOnHidden- Type
- boolean
- Required
- optional
- Description
Pause the subscription when the browser tab is hidden, resuming when it becomes visible. Defaults to
true.
- Name
autoCloseOnTerminal- Type
- boolean
- Required
- optional
- Description
Automatically close the subscription when the function run reaches a terminal status (
completed,failed, orcancelled). Defaults totrue.
Return value
- Name
connectionStatus- Type
- "idle" | "connecting" | "open" | "paused" | "closed" | "error"
- Required
- optional
- Description
The WebSocket connection status.
- Name
runStatus- Type
- "unknown" | "running" | "completed" | "failed" | "cancelled"
- Required
- optional
- Description
The lifecycle status of the Inngest function run. Updated from run-level messages on the channel.
- Name
isPaused- Type
- boolean
- Required
- optional
- Description
Convenience boolean for
connectionStatus === "paused".
- Name
pauseReason- Type
- "hidden" | "disabled" | null
- Required
- optional
- Description
Why the hook is paused.
hiddenmeans the document is hidden andpauseOnHiddenis enabled.disabledmeansenabledisfalse.
- Name
messages.byTopic- Type
- Record<string, Message | undefined>
- Required
- optional
- Description
The latest message per subscribed topic. Access typed data with
messages.byTopic.topicName?.data.
- Name
messages.all- Type
- Message[]
- Required
- optional
- Description
All retained messages in chronological order, bounded by
historyLimit.
- Name
messages.last- Type
- Message | null
- Required
- optional
- Description
The most recently flushed message across all subscribed topics.
- Name
messages.delta- Type
- Message[]
- Required
- optional
- Description
The newest batch of flushed messages. When buffering is disabled, this is a single-message array for the latest message.
- Name
error- Type
- Error | null
- Required
- optional
- Description
The most recent connection error, or
nullif connected successfully.
- Name
result- Type
- unknown
- Required
- optional
- Description
The function's return value, extracted from the terminal run message when the function completes.
- Name
reset- Type
- () => void
- Required
- optional
- Description
Clears the retained messages, result, and error state and resets the hook back to its initial state.
Connection status
The connectionStatus field tracks the WebSocket connection lifecycle:
| Status | Description |
|---|---|
idle | Hook is mounted but hasn't started connecting (e.g., enabled: false) |
connecting | Establishing the WebSocket connection |
open | Connected and receiving messages |
paused | The hook is intentionally paused because enabled is false or the tab is hidden |
closed | Connection closed (manually or by autoCloseOnTerminal) |
error | Connection failed. Check error for details |
Run status
The runStatus field tracks the Inngest function's execution lifecycle:
| Status | Description |
|---|---|
unknown | No run status received yet |
running | Function is actively executing |
completed | Function finished successfully. result is available |
failed | Function failed after exhausting retries |
cancelled | Function was cancelled |
When autoCloseOnTerminal is true (the default), the subscription closes automatically once runStatus reaches completed, failed, or cancelled.
Token factory pattern
The recommended approach is to pass an async factory function for token. This function is called when the hook mounts and on each reconnect, ensuring fresh tokens.
// app/actions.ts
"use server";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";
export async function getToken(runId: string) {
return getClientSubscriptionToken(inngest, {
channel: pipelineChannel({ runId }),
topics: ["status", "tokens"],
});
}
// app/page.tsx
"use client";
import { useRealtime } from "inngest/react";
import { pipelineChannel } from "../inngest/channels";
import { getToken } from "./actions";
function Pipeline({ runId }: { runId: string }) {
const topics = ["status", "tokens"] as const;
const { connectionStatus, messages } = useRealtime({
channel: pipelineChannel({ runId }),
topics,
token: () => getToken(runId),
});
return (
<p>
{connectionStatus}: {messages.byTopic.status?.data.message}
</p>
);
}
Typed topic access
When you pass a channel instance instead of a plain string, messages.byTopic, messages.all, messages.last, and messages.delta all stay typed to the subscribed topics:
const ch = pipelineChannel({ runId });
const topics = ["status", "tokens"] as const;
const { messages } = useRealtime({
channel: ch,
topics,
token: () => getToken(runId),
});
messages.byTopic.status?.data.message; // string
messages.byTopic.tokens?.data.token; // string
for (const message of messages.delta) {
if (message.kind === "run") continue;
if (message.topic === "status") {
message.data.message; // string
}
}
History management
By default, messages.all retains the last 100 messages. Adjust with historyLimit:
// Keep last 500 messages
useRealtime({
channel: ch,
topics: ["status"],
token: () => getToken(runId),
historyLimit: 500,
});
// Keep all messages (unbounded, use with caution)
useRealtime({
channel: ch,
topics: ["status"],
token: () => getToken(runId),
historyLimit: null,
});
// Keep only 10 messages
useRealtime({
channel: ch,
topics: ["status"],
token: () => getToken(runId),
historyLimit: 10,
});
Buffering
For high-frequency streams like token-by-token AI output, use bufferInterval to batch re-renders:
const { messages } = useRealtime({
channel: ch,
topics: ["tokens"],
token: () => getToken(runId),
bufferInterval: 100, // Batch messages, re-render at most every 100ms
});
messages.delta; // up to 100ms of new messages per flush
messages.all; // retained history after each flush
messages.byTopic still tracks the latest message for each topic, while messages.all, messages.last, and messages.delta are flushed on the buffer interval.
Conditional subscription
Use enabled to start or stop the subscription without unmounting the component:
const [runId, setRunId] = useState<string | null>(null);
const channel = runId ? pipelineChannel({ runId }) : undefined;
const { connectionStatus, isPaused, pauseReason } = useRealtime({
channel,
topics: ["status"],
token: runId ? () => getToken(runId) : undefined,
enabled: !!runId,
});
connectionStatus; // "idle" until a run exists, then "connecting" | "open" | ...
isPaused; // true when disabled or hidden
pauseReason; // "disabled" | "hidden" | null
If your token factory returns only a string key, make sure channel and topics are present. If it returns a full token object, the hook can derive them from the token instead.