Realtime

The v4 SDK includes a built-in realtime system for streaming updates from your Inngest functions to client applications. There's no separate package to install: channels, publishing, and subscriptions are all part of inngest.

Realtime is built into the v4 SDK. If you're using v3, see the v3 realtime docs.

Key concepts

  • Channels define a named scope for messages. They can be static or parameterized with runtime values like a runId.
  • Topics are typed message streams within a channel, each with a schema.
  • Publishing sends data to a topic with publish() in a function handler, inngest.realtime.publish() from server-side code, or step.realtime.publish() for durable publishes.
  • Subscribing consumes messages via useRealtime, subscribe(), or the client aliases inngest.realtime.subscribe() and inngest.realtime.token().

As a rule of thumb, prefer step.realtime.publish() for publishes inside functions. It is memoized and durable, unlike publish() and inngest.realtime.publish(), which will fire again if the function retries.

Imports

WhatImport from
realtime, staticSchema"inngest"
useRealtime, getClientSubscriptionToken"inngest/react"
subscribe, getSubscriptionToken"inngest/realtime"
publish, step.realtime.publishFunction handler context (no import needed)
inngest.realtime.publish, inngest.realtime.subscribe, inngest.realtime.tokenYour Inngest client instance

Quick start

1. Define a channel

src/inngest/channels.ts
import { realtime, staticSchema } from "inngest";
import { z } from "zod";

export const pipelineChannel = realtime.channel({
  name: ({ runId }: { runId: string }) => `pipeline:${runId}`,
  topics: {
    status: {
      schema: z.object({ message: z.string(), step: z.string().optional() }),
    },
    tokens: {
      schema: staticSchema<{ token: string }>(),
    },
  },
});

2. Publish from a function

src/inngest/functions.ts
import { inngest } from "./client";
import { pipelineChannel } from "./channels";

export const generate = inngest.createFunction(
  { id: "generate", triggers: [{ event: "app/generate" }] },
  async ({ event, publish, step }) => {
    const ch = pipelineChannel({ runId: event.data.runId });

    //
    // Non-durable on purpose. Use this only when replay on retry is OK.
    await publish(ch.status, { message: "Starting..." });

    await step.run("stream-output", async () => {
      for (const token of ["Hello", " ", "world"]) {
        //
        // Non-durable on purpose. Token streams may replay on retry.
        await publish(ch.tokens, { token });
      }
    });

    //
    // Prefer the durable publish for important state changes.
    await step.realtime.publish("final-status", ch.status, {
      message: "Done!",
      step: "complete",
    });
  },
);

3. Subscribe from React

src/app/page.tsx
"use client";
import { useRealtime } from "inngest/react";
import { pipelineChannel } from "../inngest/channels";

export default function PipelinePage({ runId }: { runId: string }) {
  const ch = pipelineChannel({ runId });
  const topics = ["status", "tokens"] as const;

  const { connectionStatus, runStatus, messages } = useRealtime({
    channel: ch,
    topics,
    token: () =>
      fetch(`/api/realtime-token?runId=${runId}`).then((res) => res.text()),
  });

  return (
    <div>
      <p>Connection: {connectionStatus} | Run: {runStatus}</p>
      {messages.byTopic.status && (
        <p>Status: {messages.byTopic.status.data.message}</p>
      )}
      <ul>
        {messages.all.map((msg, i) => (
          <li key={i}>[{msg.topic}] {JSON.stringify(msg.data)}</li>
        ))}
      </ul>
    </div>
  );
}

Next steps

  • Channels & topics: defining channels, parameterized names, schemas, and type inference
  • Publishing: publish(), inngest.realtime.publish(), and step.realtime.publish()
  • useRealtime: full React hook API reference
  • Subscribing: getClientSubscriptionToken(), subscribe(), and client aliases