Subscribing
Server-side subscription covers two operations: minting tokens for client use and consuming streams on the server. The v4 SDK exposes both standalone helpers and client aliases:
getClientSubscriptionToken(app, ...)for minting client-safe tokenssubscribe({ app, ... })orinngest.realtime.subscribe(...)for server-side streams
getClientSubscriptionToken(app, options)
Mints a scoped subscription token on the server and returns a serializable object safe to pass to the client. The returned object includes the token key and the resolved API base URL, so useRealtime automatically connects to the right environment (cloud or dev server) without any client-side env var configuration.
- Name
app- Type
- Inngest
- Required
- required
- Description
Your Inngest client instance.
- Name
options.channel- Type
- ChannelInstance | string
- Required
- required
- Description
The channel to authorize. Can be a channel instance or a plain string.
- Name
options.topics- Type
- string[]
- Required
- required
- Description
The topics the token grants access to. The client can only subscribe to these topics.
Returns Promise<ClientSubscriptionToken> with key (the JWT string) and apiBaseUrl (the resolved server URL).
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const token = await getClientSubscriptionToken(inngest, {
channel: ch,
topics: ["status", "tokens"],
});
// token.key - JWT string for the client
// token.apiBaseUrl - resolved API URL (e.g. cloud or localhost)
Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.
The server resolves INNGEST_DEV and passes the correct API URL through the token. Your client code does not need NEXT_PUBLIC_INNGEST_DEV, VITE_INNGEST_DEV, or any other browser-side env var.
Framework examples
// app/actions.ts
"use server";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";
export async function getRealtimeToken(runId: string) {
return getClientSubscriptionToken(inngest, {
channel: pipelineChannel({ runId }),
topics: ["status", "tokens"],
});
}
subscribe(options)
Creates a server-side subscription to a realtime channel. Without onMessage, it returns a ReadableStream-based subscription object. With onMessage, it returns a callback subscription handle.
Stream subscription
Without onMessage, subscribe returns a stream subscription.
- Name
app- Type
- Inngest
- Required
- optional
- Description
Your Inngest client instance. Used to resolve connection details.
- Name
channel- Type
- ChannelInstance | string
- Required
- required
- Description
The channel to subscribe to.
- Name
topics- Type
- string[]
- Required
- required
- Description
The topics to subscribe to.
- Name
key- Type
- string
- Required
- optional
- Description
A pre-minted JWT token key. If not provided,
appis used to mint a token automatically.
- Name
validate- Type
- boolean
- Required
- optional
- Description
Enable schema validation on incoming messages. Defaults to
true.
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const stream = await subscribe({
app: inngest,
channel: ch,
topics: ["status", "tokens"],
});
// ReadableStream - use getReader()
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value.topic, value.data);
}
The same API is available as inngest.realtime.subscribe({ channel, topics }), which omits the app option because the client is already known.
Stream methods
The returned stream has additional helper methods:
- Name
getJsonStream()- Type
- ReadableStream<Message>
- Required
- optional
- Description
Returns a new
ReadableStreamthat emits parsed JSON messages. Each call creates a fresh reader view of future messages only.
- Name
getEncodedStream()- Type
- ReadableStream<Uint8Array>
- Required
- optional
- Description
Returns a new
ReadableStreamwith SSE-formattedUint8Arraychunks (data: {...}\n\n). Useful for piping the subscription through a streaming HTTP response.
- Name
close(reason?)- Type
- (reason?: string) => void
- Required
- optional
- Description
Closes the underlying WebSocket connection.
- Name
unsubscribe(reason?)- Type
- (reason?: string) => void
- Required
- optional
- Description
Alias for
close().
// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);
// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
headers: { "Content-Type": "text/event-stream" },
});
// Clean up
stream.close();
Callback subscription
Pass onMessage to use an event-driven pattern instead of streams.
- Name
onMessage- Type
- (message: Message) => void
- Required
- required
- Description
Called for each incoming message.
- Name
onError- Type
- (error: unknown) => void
- Required
- optional
- Description
Called when a connection error occurs.
Returns Promise<{ close, unsubscribe }>.
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";
const ch = pipelineChannel({ runId: "abc123" });
const sub = await subscribe({
app: inngest,
channel: ch,
topics: ["status"],
onMessage: (message) => {
console.log(`[${message.topic}]`, message.data);
},
onError: (err) => {
console.error("Subscription error:", err);
},
});
// Clean up when done
sub.close();
Message shape
Each message received from a subscription includes:
- Name
topic- Type
- string
- Required
- optional
- Description
The topic name this message was published to.
- Name
channel- Type
- string
- Required
- optional
- Description
The resolved channel name.
- Name
data- Type
- TData
- Required
- optional
- Description
The message payload, typed according to the topic's schema.
- Name
kind- Type
- "data" | "run" | "datastream-start" | "datastream-end" | "chunk"
- Required
- optional
- Description
The message kind. Most topic publishes are
"data". Run lifecycle updates arrive as"run".
- Name
runId- Type
- string | undefined
- Required
- optional
- Description
The Inngest function run ID, if the message was published from within a function.
- Name
fnId- Type
- string | undefined
- Required
- optional
- Description
The Inngest function ID.
- Name
createdAt- Type
- Date
- Required
- optional
- Description
When the message was created.
Run messages are platform lifecycle events. Their data is intentionally broad, while topic messages remain typed from your channel schema.
Server-side stream example
A complete example using subscribe to monitor a long-running function and react to its output:
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";
async function monitorWorkflow(runId: string) {
const ch = workflowChannel({ runId });
const stream = await subscribe({
app: inngest,
channel: ch,
topics: ["status", "result"],
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value.topic === "status") {
console.log("Status:", value.data.message);
}
if (value.topic === "result") {
console.log("Result:", value.data);
stream.close();
}
}
}