# Subscribing

Server-side subscription covers two operations: **minting tokens** for client use and **consuming streams** on the server. The v4 SDK exposes both standalone helpers and client aliases:

- `getClientSubscriptionToken(app, ...)` for minting client-safe tokens
- `subscribe({ app, ... })` or `inngest.realtime.subscribe(...)` for server-side streams

***

## `getClientSubscriptionToken(app, options)`

Mints a scoped subscription token on the server and returns a serializable object safe to pass to the client. The returned object includes the token key and the resolved API base URL, so `useRealtime` automatically connects to the right environment (cloud or dev server) without any client-side env var configuration.

- `app` (Inngest): Your Inngest client instance.

* `options.channel` (ChannelInstance | string): The channel to authorize. Can be a channel instance or a plain string.

- `options.topics` (string\[]): The topics the token grants access to. The client can only subscribe to these topics.

Returns `Promise<ClientSubscriptionToken>` with `key` (the JWT string) and `apiBaseUrl` (the resolved server URL).

```ts
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const token = await getClientSubscriptionToken(inngest, {
  channel: ch,
  topics: ["status", "tokens"],
});

// token.key         - JWT string for the client
// token.apiBaseUrl   - resolved API URL (e.g. cloud or localhost)
```

> **Info:** Always mint tokens on the server. Never expose your Inngest signing key to the client. The token is scoped to the specified channel and topics, so a client cannot use it to access other channels.

> **Info:** The server resolves INNGEST\_DEV and passes the correct API URL through the token. Your client code does not need NEXT\_PUBLIC\_INNGEST\_DEV, VITE\_INNGEST\_DEV, or any other browser-side env var.

### Framework examples

```ts {{ title: "Next.js Server Action" }}
// app/actions.ts
"use server";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "../inngest/client";
import { pipelineChannel } from "../inngest/channels";

export async function getRealtimeToken(runId: string) {
  return getClientSubscriptionToken(inngest, {
    channel: pipelineChannel({ runId }),
    topics: ["status", "tokens"],
  });
}
```

```ts {{ title: "Express" }}
import express from "express";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const app = express();

app.get("/api/realtime-token", async (req, res) => {
  const { runId } = req.query;
  const token = await getClientSubscriptionToken(inngest, {
    channel: pipelineChannel({ runId: runId as string }),
    topics: ["status", "tokens"],
  });
  res.json(token);
});
```

```ts {{ title: "TanStack Start" }}
import { createServerFn } from "@tanstack/start";
import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "./inngest/client";
import { pipelineChannel } from "./inngest/channels";

export const getRealtimeToken = createServerFn({ method: "GET" })
  .validator((runId: string) => runId)
  .handler(async ({ data: runId }) => {
    return getClientSubscriptionToken(inngest, {
      channel: pipelineChannel({ runId }),
      topics: ["status", "tokens"],
    });
  });
```

***

## `subscribe(options)`

Creates a server-side subscription to a realtime channel. Without `onMessage`, it returns a `ReadableStream`-based subscription object. With `onMessage`, it returns a callback subscription handle.

### Stream subscription

Without `onMessage`, `subscribe` returns a stream subscription.

- `app` (Inngest): Your Inngest client instance. Used to resolve connection details.

* `channel` (ChannelInstance | string): The channel to subscribe to.

- `topics` (string\[]): The topics to subscribe to.

* `key` (string): A pre-minted JWT token key. If not provided, app is used to mint a token automatically.

- `validate` (boolean): Enable schema validation on incoming messages. Defaults to true.

```ts
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const stream = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status", "tokens"],
});

// ReadableStream - use getReader()
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log(value.topic, value.data);
}
```

> **Info:** The same API is available as inngest.realtime.subscribe(\{ channel, topics }), which omits the app option because the client is already known.

### Stream methods

The returned stream has additional helper methods:

- `getJsonStream()` (ReadableStream\<Message>): Returns a new ReadableStream that emits parsed JSON messages. Each call creates a fresh reader view of future messages only.

* `getEncodedStream()` (ReadableStream\<Uint8Array>): Returns a new ReadableStream with SSE-formatted Uint8Array chunks (data: \{...}\n\n). Useful for piping the subscription through a streaming HTTP response.

- `close(reason?)` ((reason?: string) => void): Closes the underlying WebSocket connection.

* `unsubscribe(reason?)` ((reason?: string) => void): Alias for close().

```ts
// Stream JSON to another consumer
const jsonStream = stream.getJsonStream();
const response = new Response(jsonStream);

// Stream as SSE
const sseStream = stream.getEncodedStream();
return new Response(sseStream, {
  headers: { "Content-Type": "text/event-stream" },
});

// Clean up
stream.close();
```

### Callback subscription

Pass `onMessage` to use an event-driven pattern instead of streams.

- `onMessage` ((message: Message) => void): Called for each incoming message.

* `onError` ((error: unknown) => void): Called when a connection error occurs.

Returns `Promise<{ close, unsubscribe }>`.

```ts
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

const ch = pipelineChannel({ runId: "abc123" });

const sub = await subscribe({
  app: inngest,
  channel: ch,
  topics: ["status"],
  onMessage: (message) => {
    console.log(`[${message.topic}]`, message.data);
  },
  onError: (err) => {
    console.error("Subscription error:", err);
  },
});

// Clean up when done
sub.close();
```

## Message shape

Each message received from a subscription includes:

- `topic` (string): The topic name this message was published to.

* `channel` (string): The resolved channel name.

- `data` (TData): The message payload, typed according to the topic's schema.

* `kind` (\`"data" | "run" | "datastream-start" | "datastream-end" | "chunk"\`): The message kind. Most topic publishes are "data". Run lifecycle updates arrive as "run".

- `runId` (string | undefined): The Inngest function run ID, if the message was published from within a function.

* `fnId` (string | undefined): The Inngest function ID.

- `createdAt` (Date): When the message was created.

Run messages are platform lifecycle events. Their `data` is intentionally broad, while topic messages remain typed from your channel schema.

## Server-side stream example

A complete example using `subscribe` to monitor a long-running function and react to its output:

```ts
import { subscribe } from "inngest/realtime";
import { inngest } from "./client";
import { workflowChannel } from "./channels";

async function monitorWorkflow(runId: string) {
  const ch = workflowChannel({ runId });

  const stream = await subscribe({
    app: inngest,
    channel: ch,
    topics: ["status", "result"],
  });

  const reader = stream.getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    if (value.topic === "status") {
      console.log("Status:", value.data.message);
    }

    if (value.topic === "result") {
      console.log("Result:", value.data);
      stream.close();
    }
  }
}
```