Publishing
There are three ways to publish realtime messages in the v4 SDK.
Prefer step.realtime.publish() whenever you can. It is durable and
memoized, so it will not run again if the function retries. Reach for
publish() or inngest.realtime.publish() only when you specifically need
non-durable behavior, such as high-frequency token streaming or publishing
from code outside a function.
| Method | Durable | Usable outside functions | Step ID | Best for |
|---|---|---|---|---|
publish() | No | No | - | High-frequency progress inside a function handler |
inngest.realtime.publish() | No | Yes | - | Publishing from routes, webhooks, or server-side code |
step.realtime.publish() | Yes | No | Required | State transitions, final results, deduped publishes |
publish(topicRef, data)
Available in the function handler arguments. Publishes immediately with no memoization, so retries re-fire the publish. Best for high-frequency, streaming-style updates where duplicates are acceptable.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance (e.g.
ch.status). Carries the channel name, topic name, and schema.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema. Validated at runtime if the schema supports it.
inngest.createFunction(
{ id: "stream-tokens", triggers: [{ event: "app/generate" }] },
async ({ event, publish, step }) => {
const ch = pipelineChannel({ runId: event.data.runId });
await step.run("generate", async () => {
const stream = await openai.chat.completions.create({
model: "gpt-4o-mini",
stream: true,
messages: [{ role: "user", content: event.data.prompt }],
});
let full = "";
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content ?? "";
if (token) {
full += token;
//
// Non-durable on purpose. This can run again on retry, which is
// usually acceptable for token streams.
await publish(ch.tokens, { token });
}
}
return full;
});
},
);
publish() works at the top level of your function handler and inside
step.run() blocks. It is still non-durable, so it fires again on retry.
inngest.realtime.publish(topicRef, data)
Publishes from your Inngest client. Use this anywhere you already have the client available, such as API routes, webhooks, or other server-side code. Inside a function run, inngest.realtime.publish() also attaches the current run ID automatically.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema.
Returns Promise<void>.
import { inngest } from "./client";
import { alertsChannel } from "./channels";
export async function POST(req: Request) {
const body = await req.json();
//
// Non-durable by design. Prefer step.realtime.publish() when this work
// lives inside a function and duplicates on retry would be a problem.
await inngest.realtime.publish(alertsChannel.alert, {
message: body.message,
severity: body.severity,
});
return new Response("OK");
}
step.realtime.publish(id, topicRef, data)
A durable step that memoizes the publish. If the function retries past this step, the publish won't re-fire. The message appears in the function's execution graph. Best for important state transitions and final results.
- Name
id- Type
- string
- Required
- required
- Description
A unique step ID. Used for memoization and appears in function logs.
- Name
topicRef- Type
- TopicRef<TData>
- Required
- required
- Description
A topic accessor from a channel instance.
- Name
data- Type
- TData
- Required
- required
- Description
The message payload. Must match the topic's schema.
Returns Promise<TData>, the published data.
inngest.createFunction(
{ id: "process-upload", triggers: [{ event: "app/upload" }] },
async ({ event, publish, step }) => {
const ch = uploadsChannel({ uploadId: event.data.uploadId });
//
// This status update is ephemeral, so non-durable publish is fine.
await publish(ch.status, { message: "Processing..." });
const result = await step.run("process", async () => {
return processUpload(event.data);
});
//
// Prefer the durable publish for important state that should not
// duplicate if the function retries.
await step.realtime.publish("publish-result", ch.result, {
success: true,
url: result.url,
});
},
);
Choosing a publish method
Prefer step.realtime.publish() by default when the publish happens inside a
function and duplicates would be incorrect or noisy.
Use publish() when:
- Streaming tokens, progress percentages, or log lines
- The data is ephemeral and duplicates on retry are fine
- You want minimum latency (no step overhead)
Use inngest.realtime.publish() when:
- Publishing from outside a function (API routes, webhooks, cron jobs)
- Reusing the client instance instead of the handler-scoped
publish() - Sending non-durable updates where retries are acceptable
Use step.realtime.publish() when:
- Publishing a final result or state transition
- You need exactly-once delivery semantics (memoized)
- The publish should appear in the function's execution graph
Type safety
All three methods validate data against the topic's schema at both compile time and runtime:
const ch = pipelineChannel({ runId: "abc" });
// TypeScript error - missing required field
await publish(ch.status, { message: "ok" }); // ✓
await publish(ch.status, { wrong: "field" }); // ✗ compile error
// Runtime validation - throws if data doesn't match Zod schema
await publish(ch.status, someUntypedData); // validated at runtime