Subscribing

Server-side subscription covers two operations: minting tokens for client use and consuming streams on the server. The v4 SDK exposes both standalone helpers and client aliases:

  • getClientSubscriptionToken(app, ...) for minting client-safe tokens
  • subscribe({ app, ... }) or inngest.realtime.subscribe(...) for server-side streams

getClientSubscriptionToken(app, options)

Mints a scoped subscription token on the server and returns a serializable object safe to pass to the client. The returned object includes the token key and the resolved API base URL, so useRealtime automatically connects to the right environment (cloud or dev server) without any client-side env var configuration.

  • Name
    app
    Type
    Inngest
    Required
    required
    Description

    Your Inngest client instance.

  • Name
    options.channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to authorize. Can be a channel instance or a plain string.

  • Name
    options.topics
    Type
    string[]
    Required
    required
    Description

    The topics the token grants access to. The client can only subscribe to these topics.

Returns Promise<ClientSubscriptionToken> with key (the JWT string) and apiBaseUrl (the resolved server URL).

import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const token = await getClientSubscriptionToken(inngest, {
  channel: ch,
  topics: ["status", "tokens"],
});

// token.key         - JWT string for the client
// token.apiBaseUrl   - resolved API URL (e.g. cloud or localhost)

Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.

The server resolves INNGEST_DEV and passes the correct API URL through the token. Your client code does not need NEXT_PUBLIC_INNGEST_DEV, VITE_INNGEST_DEV, or any other browser-side env var.

Framework examples

// app/actions.ts
"use server";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";

export async function getRealtimeToken(runId: string) {
  return getClientSubscriptionToken(inngest, {
    channel: pipelineChannel({ runId }),
    topics: ["status", "tokens"],
  });
}

subscribe(options)

Creates a server-side subscription to a realtime channel. Without onMessage, it returns a ReadableStream-based subscription object. With onMessage, it returns a callback subscription handle.

Stream subscription

Without onMessage, subscribe returns a stream subscription.

  • Name
    app
    Type
    Inngest
    Required
    optional
    Description

    Your Inngest client instance. Used to resolve connection details.

  • Name
    channel
    Type
    ChannelInstance | string
    Required
    required
    Description

    The channel to subscribe to.

  • Name
    topics
    Type
    string[]
    Required
    required
    Description

    The topics to subscribe to.

  • Name
    key
    Type
    string
    Required
    optional
    Description

    A pre-minted JWT token key. If not provided, app is used to mint a token automatically.

  • Name
    validate
    Type
    boolean
    Required
    optional
    Description

    Enable schema validation on incoming messages. Defaults to true.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const stream = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status", "tokens"],
});

// ReadableStream - use getReader()
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log(value.topic, value.data);
}

The same API is available as inngest.realtime.subscribe({ channel, topics }), which omits the app option because the client is already known.

Stream methods

The returned stream has additional helper methods:

  • Name
    getJsonStream()
    Type
    ReadableStream<Message>
    Required
    optional
    Description

    Returns a new ReadableStream that emits parsed JSON messages. Each call creates a fresh reader view of future messages only.

  • Name
    getEncodedStream()
    Type
    ReadableStream<Uint8Array>
    Required
    optional
    Description

    Returns a new ReadableStream with SSE-formatted Uint8Array chunks (data: {...}\n\n). Useful for piping the subscription through a streaming HTTP response.

  • Name
    close(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Closes the underlying WebSocket connection.

  • Name
    unsubscribe(reason?)
    Type
    (reason?: string) => void
    Required
    optional
    Description

    Alias for close().

// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);

// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
  headers: { "Content-Type": "text/event-stream" },
});

// Clean up
stream.close();

Callback subscription

Pass onMessage to use an event-driven pattern instead of streams.

  • Name
    onMessage
    Type
    (message: Message) => void
    Required
    required
    Description

    Called for each incoming message.

  • Name
    onError
    Type
    (error: unknown) => void
    Required
    optional
    Description

    Called when a connection error occurs.

Returns Promise<{ close, unsubscribe }>.

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const sub = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status"],
  onMessage: (message) => {
    console.log(`[${message.topic}]`, message.data);
  },
  onError: (err) => {
    console.error("Subscription error:", err);
  },
});

// Clean up when done
sub.close();

Message shape

Each message received from a subscription includes:

  • Name
    topic
    Type
    string
    Required
    optional
    Description

    The topic name this message was published to.

  • Name
    channel
    Type
    string
    Required
    optional
    Description

    The resolved channel name.

  • Name
    data
    Type
    TData
    Required
    optional
    Description

    The message payload, typed according to the topic's schema.

  • Name
    kind
    Type
    "data" | "run" | "datastream-start" | "datastream-end" | "chunk"
    Required
    optional
    Description

    The message kind. Most topic publishes are "data". Run lifecycle updates arrive as "run".

  • Name
    runId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function run ID, if the message was published from within a function.

  • Name
    fnId
    Type
    string | undefined
    Required
    optional
    Description

    The Inngest function ID.

  • Name
    createdAt
    Type
    Date
    Required
    optional
    Description

    When the message was created.

Run messages are platform lifecycle events. Their data is intentionally broad, while topic messages remain typed from your channel schema.

Server-side stream example

A complete example using subscribe to monitor a long-running function and react to its output:

import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";

async function monitorWorkflow(runId: string) {
  const ch = workflowChannel({ runId });

  const stream = await subscribe({
    app: inngest,
    channel: ch,
    topics: ["status", "result"],
  });

  const reader = stream.getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    if (value.topic === "status") {
      console.log("Status:", value.data.message);
    }

    if (value.topic === "result") {
      console.log("Result:", value.data);
      stream.close();
    }
  }
}