Concurrency management

Managing concurrency is important for any production system. Inngest allows you to manage concurrency limits in functions. Concurrency controls the number of steps executing code at any one time. It works by creating multi-level virtual queues within each function, directly in code, without thinking about infrastructure.

Concurrency use cases

Use cases include:

  • Setting individual function concurrency limits, for example to only run 10 imports at once.
  • Setting global limits for many functions which share a pool of resources, for example with many functions which use shared AI capacity, or global DB connections.
  • Setting limits on your own individual accounts, tenants or users, for example limiting unpaid users to a specific capacity.
  • A combination of the above, with functions limited by the first concurrency key to hit limits.

Examples and configuration

Inngest lets you provide fine-grained concurrency across all functions in a simple, configurable manner. You can control each function's concurrency limits within your function definition. Here are two examples:

// Example 1: a simple concurrency definition limiting this function to 10 steps at once.
inngest.createFunction(
  {
    id: "another-function",
    concurrency: 10,
  },
  { event: "ai/summary.requested" },
  async ({ event, step }) => {
  }
);

// Example 2:  A complete, complex example with two virtual concurrency queues.
inngest.createFunction(
  {
    id: "unique-function-id",
    concurrency: [
      {
         // Use an account-level concurrency limit for this function, using the
         // "openai" key as a virtual queue.  Any other function which
         // runs using the same "openai"` key counts towards this limit.
         scope: "account",
         key: `"openai"`,
         // If there are 10 functions running with the "openai" key, this function's
         // runs will wait for capacity before executing.
         limit: 10,
      },
      {
         // Create another virtual concurrency queue for this function only.  This
         // limits all accounts to a single execution for this function, based off
         // of the `event.data.account_id` field.
         // "fn" is the default scope, so we could omit this field.
         scope: "fn",
         key: "event.data.account_id",
         limit: 1,
      },
    ],
  },
  { event: "ai/summary.requested" },
  async ({ event, step }) => {
  }
);

In the first example, the function is constrained to 10 executing steps at once.

In the second example, we define two concurrency constraints that create two virtual queues to manage capacity. Runs will be limited if they hit either of the virtual queue's limits. For example:

  • If there are 10 steps executing under the 'openai' key's virtual queue, any future runs will be blocked and will wait for existing runs to finish before executing.
  • If there are 5 steps executing under the 'openai' key and a single event.data.account_id enqueues 2 runs, the second run is limited by the event.data.account_id virtual queue and will wait before executing.

How concurrency works

Concurrency works by limiting the number of steps executing at a single time. Within Inngest, execution is defined as "an SDK running code". Calling step.sleep, step.sleepUntil, step.waitForEvent, or step.invoke does not count towards capacity limits, as the SDK doesn't execute code while those steps wait. Because sleeping or waiting is common, concurrency does not limit the number of functions in progress. Instead, it limits the number of steps executing at any single time.

Queues are ordered from oldest to newest jobs (FIFO) across the same function. Ordering amongst different functions is not guaranteed. This means that within a specific function, Inngest prioritizes finishing older functions above starting newer functions - even if the older functions continue to schedule new steps to run. Different functions, however, compete for capacity, with runs on the most backlogged function much more likely (but not guaranteed) to be scheduled first.

Inngest manages concurrency for you within our scheduling system, and you do not need to provision queues, infrastructure, or manage concurrency limits within your own workers or services.

Some additional information:

  • The order of keys does not matter. Concurrency is limited by any key that reaches its limits.
  • You can specify multiple keys for the same scope, as long as the resulting key evaluates to a different string.

Concurrency control across specific steps in a function

You might need to set a different concurrency limit for a single step in a function. For example, within an AI flow you may have 10 pre-processing steps which can run with higher limits, and a single AI call with much lower limits.

To control concurrency on individual steps, extract the step into a new function with its own concurrency controls, and invoke the new function using step.invoke. This lets you combine concurrency controls and manage "flow control" in a clean, composable manner.

How global limits work

While two functions can share different account scoped limits, we strongly recommend that you use a global const with a single shared limit.

You may write two functions that define different levels for an 'account' scoped concurrency limit. For example, function A may limit the "ai" capacity to 5, while function B limits the "ai" capacity to 50:

inngest.createFunction(
  {
    id: "func-a",
    concurrency: {
      scope: "account",
      key: `"openai"`,
      limit: 5,
    },
  },
  { event: "ai/summary.requested" },
  async ({ event, step }) => {
  }
);

inngest.createFunction(
  {
    id: "func-b",
    concurrency: {
      scope: "account",
      key: `"openai"`,
      limit: 50,
    },
  },
  { event: "ai/summary.requested" },
  async ({ event, step }) => {
  }
);

This works in Inngest and is not a conflict. Instead, function A is limited any time there are 5 or more functions running in the 'openai' queue. Function B, however, is limited when there are 50 or more items in the queue. This means that function B has more capacity than function A, though both are limited and compete on the same virtual queue.

Because functions are FIFO, function runs are more likely to be worked on the older their jobs get (as the backlog grows). If function A's jobs stay in the backlog longer than function B's jobs, it's likely that their jobs will be worked on as soon as capacity is free. That said, function B will almost always have capacity before function A and may block function A's work.

While this works we strongly recommend that you use global constants for env or account level scopes, giving functions the same limit.

Limitations

  • Concurrency limits the number of steps executing at a single time. It does not yet perform rate limiting over a given period of time. This is scheduled for a Q2 '24 release.
  • Functions can specify up to 2 concurrency constraints at once
  • The maximum concurrency limit is defined by your account's plan
  • Ordering amongst the same function is guaranteed (with the exception of retries)
  • Ordering amongst different functions is not guaranteed. Functions compete with each other randomly to be scheduled.

Concurrency reference

  • Name
    limit
    Type
    number
    Required
    required
    Description

    The maximum number of concurrently running steps. A value of 0 or undefined is the equivalent of not setting a limit. The maximum value is dictated by your account's plan.

  • Name
    scope
    Type
    'account' | 'env' | 'fn'
    Required
    optional
    Description

    The scope for the concurrency limit, which impacts whether concurrency is managed on an individual function, across an environment, or across your entire account.

    • fn (default): only the runs of this function affects the concurrency limit
    • env: all runs within the same environment that share the same evaluated key value will affect the concurrency limit. This requires setting a key which evaluates to a virtual queue name.
    • account: every run that shares the same evaluated key value will affect the concurrency limit, across every environment. This requires setting a key which evaluates to a virtual queue name.

    Each SDK exposes these enums in the idiomatic manner of a given language, though the meanings of the enums are the same across all languages.

  • Name
    key
    Type
    string
    Required
    optional
    Description

    An expression which evaluates to a string given the triggering event. The string returned from the expression is used as the concurrency queue name. A key is required when setting an env or account level scope.

    Expressions are defined using the Common Expression Language (CEL) with the original event accessible using dot-notation. Read our guide to writing expressions for more info. Examples:

    • Limit concurrency to n (via limit) per customer id: 'event.data.customer_id'
    • Limit concurrency to n per user, per import id: 'event.data.user_id + "-" + event.data.import_id'
    • Limit globally using a specific string: '"global-quoted-key"' (wrapped in quotes, as the expression is evaluated as a language)

Further examples

Handling third party API rate limits

Here, we use the Resend SDK to send an email. Resend's rate limit is 10 requests per second so we set a lower concurrency as our function is simple and may execute multiple times per second. Here we use a limit of 4 to keep the throughput a bit slower than necessary:

export const send = inngest.createFunction(
  {
    name: "Email: Pending invoice",
    id: "email-pending-invoice",
    concurrency: {
      limit: 4, // Resend's rate limit is 10 req/s
    },
  },
  { event: "billing/invoice.pending" },
  async ({ event, step }) => {
    await step.run("send-email", async () => {
      return await resend.emails.send({
        from: "hello@myco.com",
        to: event.user.email,
        subject: `Invoice pending for ${event.data.invoicePeriod}`,
        text: `Dear user, ...`,
      });
    });

    return { message: "success" };
  }
);

Restricting parallel import jobs for a customer id

In this hypothetical system, customers can upload .csv files which each need to be processed and imported. We want to limit each customer to only one import job at a time so no two jobs are writing to a customer's data at a given time. We do this by setting a limit: 1 and a concurrency key to the customerId which is included in every single event payload.

Inngest ensures that the concurrency (1) applies to each unique value for event.data.customerId. This allows different customers to have functions running at the same exact time, but no given customer can have two functions running at once!

export const send = inngest.createFunction(
  {
    name: "Process customer csv import",
    id: "process-customer-csv-import",
    concurrency: {
      limit: 1,
      key: `event.data.customerId`, // You can use any piece of data from the event payload
    },
  },
  { event: "csv/file.uploaded" },
  async ({ event, step }) => {
    await step.run("process-file", async () => {
      const file = await bucket.fetch(event.data.fileURI);
      // ...
    });

    return { message: "success" };
  }
);