# Durable Endpoints Streaming&#x20;

Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data, while keeping the durability guarantees of [durable steps](/docs-markdown/learn/inngest-steps).

Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.

> **Callout:** Durable Endpoints streaming is currently only available in the TypeScript SDK. This guide assumes you've already set up a Durable Endpoint.

## When to use streaming

- **AI inference** — Stream LLM tokens to the browser as they're generated, so users see results immediately.
- **Status updates** — Send progress messages during long-running endpoint executions.
- **Making existing streaming endpoints durable** — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.

If you don't need to stream data directly to an HTTP client, consider using [Realtime](/docs-markdown/features/realtime) to push updates from background Inngest functions via pub/sub channels.

## Example

In this example, we'll create an HTTP endpoint that generates a haiku and then translates it to French. The client will be the browser, and it'll render the haiku and its translation as they're generated. The user will see the streamed LLM output appear in realtime.

### Server

Import `step` from `inngest` and `stream` from `inngest/experimental/durable-endpoints`, then use `stream.push()` or `stream.pipe()` inside your endpoint handler:

```typescript
import Anthropic from "@anthropic-ai/sdk";
import { step } from "inngest";
import { stream } from "inngest/experimental/durable-endpoints";
import { inngest } from "@/inngest";

export const GET = inngest.endpoint(async () => {
  // Option A: push() with an SDK event callback
  const text = await step.run("generate", async () => {
    stream.push("Generating...\n");

    const client = new Anthropic();
    const response = client.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 512,
      messages: [{ role: "user", content: "Write a haiku about durability." }],
    });

    response.on("text", (token) => stream.push(token));
    return await response.finalText();
  });

  // Option B: pipe() — streams each chunk AND returns the collected text
  await step.run("translate", async () => {
    stream.push(`\nTranslating...\n`);

    const client = new Anthropic();
    const response = client.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 256,
      messages: [{ role: "user", content: `Translate to French: ${text}` }],
    });

    return stream.pipe(async function* () {
      for await (const event of response) {
        if (
          event.type === "content_block_delta" &&
          event.delta.type === "text_delta"
        ) {
          yield event.delta.text;
        }
      }
    });
  });

  return new Response("\nDone!");
});
```

### Client

Use `fetchWithStream()` from `inngest/experimental/durable-endpoints/client` to consume the stream. It handles SSE parsing, sync-to-async redirects, and commit/rollback automatically. Chunks arrive on the client in the order they are pushed or yielded on the server.

```typescript
"use client";

import { useState, useRef } from "react";
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";

export default function Generate() {
  const [chunks, setChunks] = useState<string[]>([]);
  const uncommittedCountRef = useRef(0);

  async function run() {
    setChunks([]);
    uncommittedCountRef.current = 0;

    const resp = await fetchWithStream("/api/generate", {
      onData: ({ data }) => {
        if (typeof data === "string") {
          uncommittedCountRef.current++;
          setChunks((prev) => [...prev, data]);
        }
      },
      onRollback: () => {
        // A step failed and will retry — remove the chunks it produced
        const count = uncommittedCountRef.current;
        setChunks((prev) => prev.slice(0, prev.length - count));
        uncommittedCountRef.current = 0;
      },
      onCommit: () => {
        // Step completed — its chunks are now permanent
        uncommittedCountRef.current = 0;
      },
    });

    // The endpoint's return value is available as the Response body
    const result = await resp.text();
    setChunks((prev) => [...prev, result]);
  }

  return (
    <div>
      <button onClick={run}>Generate</button>
      <pre>{chunks.join("")}</pre>
    </div>
  );
}
```

## Server API

### `stream.push(data)`

Send a single chunk of data to the client as an SSE event.

```typescript
stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
```

- Accepts any JSON-serializable value.
- Fire-and-forget. Does not block execution or return a value.
- No-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.

`push()` is ideal for one-off status messages or streaming via provider SDK event callbacks.

### `stream.pipe(source)`

Pipe a stream source to the client and resolve with the concatenated text of all chunks. Each chunk is sent as an SSE event in real-time.

The simplest case is piping a `ReadableStream`, like a `fetch` response body:

```typescript
const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk
```

When you need to transform or filter chunks before they're sent, pass an async generator function. Each `yield` sends one chunk to the client:

```typescript
const text = await stream.pipe(async function* () {
  for await (const event of response) {
    // Only yield the parts you want the client to see
    if (event.type === "content_block_delta") {
      yield event.delta.text;
    }
  }
});
```

`pipe()` accepts three source types:

- **`ReadableStream`** — piped directly, decoded from bytes to string chunks.
- **`AsyncIterable<string>`** — each value in the iterable becomes a chunk.
- **`() => AsyncIterable<string>`** — a function that returns an async iterable. This is what lets you pass `async function*` generators directly to `pipe()`.

No-op outside of an Inngest execution context (resolves with an empty string).

For the full `stream.push()` and `stream.pipe()` API reference, see the [Streaming reference](/docs-markdown/reference/typescript/v4/durable-endpoints#streaming).

## Client API

### `fetchWithStream(url, options)`

The primary way to consume a streaming Durable Endpoint. Import it from `inngest/experimental/durable-endpoints/client`:

```typescript
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
```

`fetchWithStream()` returns a `Promise<Response>`. `await` it to drive the stream to completion. When the endpoint finishes, the returned `Response` contains the endpoint's final return value. If the endpoint does not use streaming, `fetchWithStream()` returns the raw `Response` as-is.

The core callbacks handle the majority of streaming use cases:

- **`onData({ data, hashedStepId })`** — Called for each chunk. `data` is the deserialized value; `hashedStepId` identifies which step produced it (or `null` if streamed outside a step). Data should be considered uncommitted until `onCommit` fires.
- **`onRollback({ hashedStepId })`** — Called when a step fails and will retry. Your code is responsible for tracking and removing the chunks produced by that step (see the [example above](#client) for a pattern using a ref counter).
- **`onCommit({ hashedStepId })`** — Called when a step completes successfully. Chunks from that step are now permanent and will never be rolled back.

Because `stream.push()` accepts any JSON-serializable value, `data` in the `onData` callback is typed as `unknown`. Narrow the type in your callback as needed:

```typescript
const uncommittedCount = { current: 0 };

const resp = await fetchWithStream("/api/generate", {
  onData: ({ data }) => {
    if (typeof data === "string") {
      uncommittedCount.current++;
      console.log("Chunk:", data);
    }
  },
  onRollback: () => {
    // Discard uncommitted chunks and reset counter
    uncommittedCount.current = 0;
  },
  onCommit: () => {
    // Chunks are permanent — reset counter
    uncommittedCount.current = 0;
  },
});

const result = await resp.text();
```

For all available options see the [full API reference](/docs-markdown/reference/typescript/v4/durable-endpoints#client-fetchwithstream).

## How it works

### Sync-to-async transitions

When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to `step.sleep()`, `step.waitForEvent()`, or a retry), the SDK sends a redirect event telling the client where to reconnect, and the stream continues through the Inngest server.

`fetchWithStream()` handles this redirect automatically. The client sees a single continuous stream regardless of sync-to-async transitions.

### Streaming activation

Streaming is activated lazily. The endpoint only sends an SSE response if:

- The client sends the `Accept: text/event-stream` header (which `fetchWithStream()` does automatically), **and**
- Your code calls `stream.push()` or `stream.pipe()` during execution.

If neither `push()` nor `pipe()` is called, the endpoint behaves like a regular non-streaming Durable Endpoint.

### Rollback on retry

Each chunk is tagged with the step that produced it (via `hashedStepId`). When a step completes, `onCommit` fires and those chunks become permanent. When a step fails and retries, `onRollback` fires and your client code should discard the uncommitted chunks from that step. On the retry attempt, the step streams fresh data that replaces what was rolled back. See the [example above](#client) for an implementation pattern.

Data streamed outside of a `step.run()` is never rolled back.

### SSE event types

The stream uses SSE with the following event types. The `inngest.*` events are internal protocol events handled by `fetchWithStream()` automatically; only `inngest.stream` events contain user data.

| Event name              | Payload                                               | Purpose                                               |
| ----------------------- | ----------------------------------------------------- | ----------------------------------------------------- |
| `inngest.metadata`      | `{ runId }`                                           | Always first. Identifies the run.                     |
| `inngest.stream`        | `{ data, hashedStepId? }`                             | User data from `push()` / `pipe()`.                   |
| `inngest.commit`        | `{ hashedStepId }`                                    | Step succeeded. Its streamed data is permanent.       |
| `inngest.rollback`      | `{ hashedStepId }`                                    | Step failed. Discard its uncommitted data.            |
| `inngest.redirect_info` | `{ runId, url }`                                      | Tells the client to reconnect for async continuation. |
| `inngest.response`      | `{ status, response: { body, headers, statusCode } }` | Terminal event. Closes the stream.                    |

## Limitations

Durable Endpoints streaming is currently in developer preview. In addition to any [general Durable Endpoint limitations](/docs-markdown/learn/durable-endpoints#limitations), the following apply:

- **15 minute timeout** — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
- **No rollback outside of steps** — Data streamed outside of a `step.run()` is never rolled back. If you need rollback guarantees, stream from within a step.
- **One streaming parallel step** — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
- **No streaming from child functions** — `step.invoke()` calls cannot stream data back to the parent function's client.
- **Raw `Response` objects may be lost on async transition** — If your endpoint returns a `Response` (like a file download) and goes async, the Response is lost because it can't be memoized. Use `stream.push()` or `stream.pipe()` instead.

## SDK support

| SDK        | Support           | Version                         |
| ---------- | ----------------- | ------------------------------- |
| TypeScript | Developer Preview | >= 4.x (with `endpointAdapter`) |