Publishing

There are three ways to publish realtime messages in the v4 SDK.

Prefer step.realtime.publish() whenever you can. It is durable and memoized, so it will not run again if the function retries. Reach for publish() or inngest.realtime.publish() only when you specifically need non-durable behavior, such as high-frequency token streaming or publishing from code outside a function.

MethodDurableUsable outside functionsStep IDBest for
publish()NoNo-High-frequency progress inside a function handler
inngest.realtime.publish()NoYes-Publishing from routes, webhooks, or server-side code
step.realtime.publish()YesNoRequiredState transitions, final results, deduped publishes

publish(topicRef, data)

Available in the function handler arguments. Publishes immediately with no memoization, so retries re-fire the publish. Best for high-frequency, streaming-style updates where duplicates are acceptable.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance (e.g. ch.status). Carries the channel name, topic name, and schema.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema. Validated at runtime if the schema supports it.

inngest.createFunction(
  { id: "stream-tokens", triggers: [{ event: "app/generate" }] },
  async ({ event, publish, step }) => {
    const ch = pipelineChannel({ runId: event.data.runId });

    await step.run("generate", async () => {
      const stream = await openai.chat.completions.create({
        model: "gpt-4o-mini",
        stream: true,
        messages: [{ role: "user", content: event.data.prompt }],
      });

      let full = "";
      for await (const chunk of stream) {
        const token = chunk.choices[0]?.delta?.content ?? "";
        if (token) {
          full += token;

          //
          // Non-durable on purpose. This can run again on retry, which is
          // usually acceptable for token streams.
          await publish(ch.tokens, { token });
        }
      }
      return full;
    });
  },
);

publish() works at the top level of your function handler and inside step.run() blocks. It is still non-durable, so it fires again on retry.

inngest.realtime.publish(topicRef, data)

Publishes from your Inngest client. Use this anywhere you already have the client available, such as API routes, webhooks, or other server-side code. Inside a function run, inngest.realtime.publish() also attaches the current run ID automatically.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema.

Returns Promise<void>.

import { inngest } from "./client";
import { alertsChannel } from "./channels";

export async function POST(req: Request) {
  const body = await req.json();

  //
  // Non-durable by design. Prefer step.realtime.publish() when this work
  // lives inside a function and duplicates on retry would be a problem.
  await inngest.realtime.publish(alertsChannel.alert, {
    message: body.message,
    severity: body.severity,
  });

  return new Response("OK");
}

step.realtime.publish(id, topicRef, data)

A durable step that memoizes the publish. If the function retries past this step, the publish won't re-fire. The message appears in the function's execution graph. Best for important state transitions and final results.

  • Name
    id
    Type
    string
    Required
    required
    Description

    A unique step ID. Used for memoization and appears in function logs.

  • Name
    topicRef
    Type
    TopicRef<TData>
    Required
    required
    Description

    A topic accessor from a channel instance.

  • Name
    data
    Type
    TData
    Required
    required
    Description

    The message payload. Must match the topic's schema.

Returns Promise<TData>, the published data.

inngest.createFunction(
  { id: "process-upload", triggers: [{ event: "app/upload" }] },
  async ({ event, publish, step }) => {
    const ch = uploadsChannel({ uploadId: event.data.uploadId });

    //
    // This status update is ephemeral, so non-durable publish is fine.
    await publish(ch.status, { message: "Processing..." });

    const result = await step.run("process", async () => {
      return processUpload(event.data);
    });

    //
    // Prefer the durable publish for important state that should not
    // duplicate if the function retries.
    await step.realtime.publish("publish-result", ch.result, {
      success: true,
      url: result.url,
    });
  },
);

Choosing a publish method

Prefer step.realtime.publish() by default when the publish happens inside a function and duplicates would be incorrect or noisy.

Use publish() when:

  • Streaming tokens, progress percentages, or log lines
  • The data is ephemeral and duplicates on retry are fine
  • You want minimum latency (no step overhead)

Use inngest.realtime.publish() when:

  • Publishing from outside a function (API routes, webhooks, cron jobs)
  • Reusing the client instance instead of the handler-scoped publish()
  • Sending non-durable updates where retries are acceptable

Use step.realtime.publish() when:

  • Publishing a final result or state transition
  • You need exactly-once delivery semantics (memoized)
  • The publish should appear in the function's execution graph

Type safety

All three methods validate data against the topic's schema at both compile time and runtime:

const ch = pipelineChannel({ runId: "abc" });

// TypeScript error - missing required field
await publish(ch.status, { message: "ok" }); // ✓
await publish(ch.status, { wrong: "field" }); // ✗ compile error

// Runtime validation - throws if data doesn't match Zod schema
await publish(ch.status, someUntypedData); // validated at runtime