Example middleware
v2.0.0+

The following examples show how you might use middleware in some real-world scenarios.


Cloudflare Workers AI

Workers AI allows you to run machine learning models, on the Cloudflare network, from your own code, triggered by Inngest.

To use the @cloudflare/ai package, you need access to the env object passed to a Workers route handler. This argument is usually abstracted away by a serve handler, but middleware can access arguments passed to the request.

Use this along with mutating function input to set a new ai property that you can use within functions, like in the following example:

import { Ai } from "@cloudflare/ai";
import { InngestMiddleware } from "inngest";

interface Env {
  // If you set another name in wrangler.toml as the value for 'binding',
  // replace "AI" with the variable name you defined.
  AI: any;
}

export const cloudflareMiddleware = new InngestMiddleware({
  name: "Inngest: Workers AI",
  init: () => {
    return {
      onFunctionRun: ({ reqArgs }) => {
        const [, env] = reqArgs as [Request, Env];
        const ai = new Ai(env.AI);

        return {
          transformInput: () => {
            return { ctx: { ai } };
          },
        };
      },
    };
  },
});
import { inngest } from "./client";

export default inngest.createFunction(
  { id: "hello-world" },
  { event: "demo/event.sent" },
  async ({ ai }) => {
    // `ai` is typed and can be used directly or within a step
    const response = await ai.run("@cf/meta/llama-2-7b-chat-int8", {
      prompt: "What is the origin of the phrase Hello, World",
    });
  }
);

Common actions for every function

You likely reuse the same steps across many functions - whether it be fetching user data or sending an email, your app is hopefully full of reusable blocks of code.

We could add some middleware to pass these into any Inngest function, automatically wrapping them in step.run() and allowing the code inside our function to feel a little bit cleaner.

/**
 * Pass to a client to provide a set of actions as steps to all functions, or to
 * a function to provide a set of actions as steps only to that function.
 */
const inngest = new Inngest({
  id: "my-app",
  middleware: [
    createActionsMiddleware({
      getUser(id: string) {
        return db.user.get(id);
      },
    }),
  ],
});

inngest.createFunction(
  { id: "user-data-dump" },
  { event: "app/data.requested" },
  async ({ event, action: { getUser } }) => {
    // The first parameter is the step's options or ID
    const user = await getUser("get-user-details", event.data.userId);
  }
);
import { InngestMiddleware, StepOptionsOrId } from "inngest";

/**
 * Create a middleware that wraps a set of functions in step tooling, allowing
 * them to be invoked directly instead of using `step.run()`.
 *
 * This is useful for providing a set of common actions to a particular function
 * or to all functions created by a client.
 */
export const createActionsMiddleware = <T extends Actions>(rawActions: T) => {
  return new InngestMiddleware({
    name: "Inngest: Actions",
    init: () => {
      return {
        onFunctionRun: () => {
          return {
            transformInput: ({ ctx: { step } }) => {
              const action: FilterActions<T> = Object.entries(
                rawActions
              ).reduce((acc, [key, value]) => {
                if (typeof value !== "function") {
                  return acc;
                }

                const action = (
                  idOrOptions: StepOptionsOrId,
                  ...args: unknown[]
                ) => {
                  return step.run(idOrOptions, () => value(...args));
                };

                return {
                  ...acc,
                  [key]: action,
                };
              }, {} as FilterActions<T>);

              return {
                ctx: { action },
              };
            },
          };
        },
      };
    },
  });
};

type Actions = Record<string, unknown>;

/**
 * Filter out all keys from `T` where the associated value does not match type
 * `U`.
 */
type KeysNotOfType<T, U> = {
  [P in keyof T]: T[P] extends U ? never : P;
}[keyof T];

/**
 * Given a set of generic objects, extract any top-level functions and
 * appropriately shim their types.
 *
 * We use this type to allow users to spread a set of functions into the
 * middleware without having to worry about non-function properties.
 */
type FilterActions<Fns extends Record<string, any>> = {
  [K in keyof Omit<Fns, KeysNotOfType<Fns, (...args: any[]) => any>>]: (
    idOrOptions: StepOptionsOrId,
    ...args: Parameters<Fns[K]>
  ) => Promise<Awaited<ReturnType<Fns[K]>>>;
};

E2E Encryption

Inngest helps memoize state between steps within a function, but you may want to encrypt this, ensuring plaintext data never leaves your server.

Our @inngest/middleware-encryption package uses this method to encrypt and decrypt data.

⚠️ If you encrypt your step data and lose your encryption key, you'll lose access to all encrypted state. Be careful!

const inngest = new Inngest({
  id: "my-app",
  middleware: [stepEncryptionMiddleware()],
});

inngest.createFunction(
  { id: "example-function" },
  { event: "app/user.created" },
  async ({ event, step }) => {
    /**
     * The return value of `db.get()` - and therefore the value of `user` is now
     * silently encrypted and decrypted by the middleware; no plain-text step
     * data leaves your server or is stored in Inngest Cloud.
     */
    const user = await step.run("get-user", () =>
      db.get("user", event.data.userId)
    );
  }
);

This example's "encryption" is just stringifying and reversing the value - in practice you'll want to replace this with your own method using something like node:crypto.

const encryptionMarker = "__ENCRYPTED__";
type EncryptedValue = { [encryptionMarker]: true; data: string };

export const encryptionMiddleware = (
  key: string = process.env.INNGEST_ENCRYPTION_KEY as string
) => {
  if (!key) {
    throw new Error("Missing INNGEST_ENCRYPTION_KEY environment variable");
  }

  // Some internal functions that we'll use to encrypt and decrypt values.
  // In practice, you'll want to use the `key` passed in to handle encryption
  // properly.
  const isEncryptedValue = (value: unknown): value is EncryptedValue => {
    return (
      typeof value === "object" &&
      value !== null &&
      encryptionMarker in value &&
      value[encryptionMarker] === true &&
      "data" in value &&
      typeof value["data"] === "string"
    );
  };

  const encrypt = (value: unknown): EncryptedValue => {
    return {
      [encryptionMarker]: true,
      data: JSON.stringify(value).split("").reverse().join(""),
    };
  };

  const decrypt = <T>(value: T): T => {
    if (isEncryptedValue(value)) {
      return JSON.parse(value.data.split("").reverse().join("")) as T;
    }

    return value;
  };

  return new InngestMiddleware({
    name: "Step Encryption Middleware",
    init: () => ({
      onSendEvent: () => ({
        transformInput: ({ payloads }) => ({
          payloads: payloads.map((payload) => ({
            ...payload,
            data: payload.data && encrypt(payload.data),
          })),
        }),
      }),
      onFunctionRun: () => ({
        transformInput: ({ ctx, steps }) => ({
          steps: steps.map((step) => ({
            ...step,
            data: step.data && decrypt(step.data),
          })),
        }),
        transformOutput: (ctx) => {
          if (!ctx.step) {
            return;
          }

          return {
            result: {
              data: ctx.result.data && encrypt(ctx.result.data),
            },
          };
        },
      }),
    }),
  });
};

We could expand this middleware to transform event data entering and leaving the SDK. Be aware that, unlike step data, event data is much more commonly shared between systems; think about if you need to also encrypt your event data before doing so.

new InngestMiddleware({
  name: "Full Encryption Middleware",
  init: () => ({
    onSendEvent: () => ({
      transformInput: ({ payloads }) => ({
        payloads: payloads.map((payload) => ({
          ...payload,
          data: payload.data && encrypt(payload.data),
        })),
      }),
    }),
    onFunctionRun: () => ({
      transformInput: ({ ctx, steps }) => ({
        steps: steps.map((step) => ({
          ...step,
          data: step.data && decrypt(step.data),
        })),
        ctx: {
          event: ctx.event && {
            ...ctx.event,
            data: ctx.event.data && decrypt(ctx.event.data),
          },
          events:
            ctx.events &&
            ctx.events?.map((event) => ({
              ...event,
              data: event.data && decrypt(event.data),
            })),
        } as {},
      }),
      transformOutput: (ctx) => {
        if (!ctx.step) {
          return;
        }

        return {
          result: {
            data: ctx.result.data && encrypt(ctx.result.data),
          },
        };
      },
    }),
  }),
});

Logging

The following shows you how you can create a logger middleware and customize it to your needs.

It is based on the built-in logger middleware in the SDK, and hope it gives you an idea of what you can do if the built-in logger doesn't meet your needs.

new InngestMiddleware({
  name: "Inngest: Logger",
  init({ client }) {
    return {
      onFunctionRun(arg) {
        const { ctx } = arg;
        const metadata = {
          runID: ctx.runId,
          eventName: ctx.event.name,
          functionName: arg.fn.name,
        };

        let providedLogger: Logger = client["logger"];
        // create a child logger if the provided logger has child logger implementation
        try {
          if ("child" in providedLogger) {
            type ChildLoggerFn = (
              metadata: Record<string, unknown>
            ) => Logger;
            providedLogger = (providedLogger.child as ChildLoggerFn)(metadata)
          }
        } catch (err) {
          console.error('failed to create "childLogger" with error: ', err);
          // no-op
        }
        const logger = new ProxyLogger(providedLogger);

        return {
          transformInput() {
            return {
              ctx: {
                /**
                 * The passed in logger from the user.
                 * Defaults to a console logger if not provided.
                 */
                logger,
              },
            };
          },
          beforeExecution() {
            logger.enable();
          },
          transformOutput({ result: { error } }) {
            if (error) {
              logger.error(error);
            }
          },
          async beforeResponse() {
            await logger.flush();
          },
        };
      },
    };
  },
})

Prisma in function context

The following is an example of adding a Prisma client to all Inngest functions, allowing them immediate access without needing to create the client themselves.

While this example uses Prisma, it serves as a good example of using the onFunctionRun -> input hook to mutate function input to perform crucial setup for your functions and keep them to just business logic.

💡 Types are inferred from middleware outputs, so your Inngest functions will see an appropriately-typed prisma property in their input.

inngest.createFunction(
  { name: "Example" },
  { event: "app/user.loggedin" },
  async ({ prisma }) => {
    await prisma.auditTrail.create(/* ... */);
  }
);
import { PrismaClient } from "@prisma/client";

const prismaMiddleware = new InngestMiddleware({
  name: "Prisma Middleware",
  init() {
    const prisma = new PrismaClient();

    return {
      onFunctionRun(ctx) {
        return {
          transformInput(ctx) {
            return {
              // Anything passed via `ctx` will be merged with the function's arguments
              ctx: {
                prisma,
              },
            };
          },
        };
      },
    };
  },
});

Check out Common actions for every function to see how this technique can be used to create steps for all of your unique logic.


Sentry error reporting and tracing

This example uses Sentry to:

  • Capture exceptions for reporting
  • Add tracing to each function run
  • Include useful context for each exception and trace like function ID and event names
import * as Sentry from "@sentry/node";

const sentryMiddleware = new InngestMiddleware({
  name: "Sentry Middleware",
  init({ client }) {
    // Initialize Sentry as soon as possible, creating a hub
    Sentry.init({ dsn: "..." });

    // Set up some tags that will be applied to all events
    Sentry.setTag("inngest.client.id", client.id);

    return {
      onFunctionRun({ ctx, fn }) {
        // Add specific context for the given function run
        Sentry.setTags({
          "inngest.function.id": fn.id(client.id),
          "inngest.function.name": fn.name,
          "inngest.event": ctx.event.name,
          "inngest.run.id": ctx.runId,
        });

        // Start a transaction for this run
        const transaction = Sentry.startTransaction({
          name: "Inngest Function Run",
          op: "run",
          data: ctx.event,
        });

        let memoSpan: Sentry.Span;
        let execSpan: Sentry.Span;

        return {
          transformInput() {
            return {
              ctx: {
                // Add the Sentry client to the input arg so our
                // functions can use it directly too
                sentry: Sentry.getCurrentHub(),
              },
            };
          },
          beforeMemoization() {
            // Track different spans for memoization and execution
            memoSpan = transaction.startChild({ op: "memoization" });
          },
          afterMemoization() {
            memoSpan.finish();
          },
          beforeExecution() {
            execSpan = transaction.startChild({ op: "execution" });
          },
          afterExecution() {
            execSpan.finish();
          },
          transformOutput({ result, step }) {
            // Capture step output and log errors
            if (step) {
              Sentry.setTags({
                "inngest.step.name": step.displayName,
                "inngest.step.op": step.op,
              });

              if (result.error) {
                Sentry.captureException(result.error);
              }
            }
          },
          async beforeResponse() {
            // Finish the transaction and flush data to Sentry before the
            // request closes
            transaction.finish();
            await Sentry.flush();
          },
        };
      },
    };
  },
});