Featured image for What Python's asyncio primitives get wrong about shared state blog post

What Python's asyncio primitives get wrong about shared state

We tried Event, Condition, and Queue. Each one gets closer but still breaks under real concurrency. Here's the observable pattern that finally works.

Aaron Harper· 3/4/2026 · 13 min read

Coordinating concurrent tasks around shared state is one of the most common problems in Python's asyncio. The standard library gives you asyncio.Event and asyncio.Condition, but each has a gap that only shows up under real concurrency pressure. We hit this while building Inngest's Python SDK, where multiple async handlers coordinate around WebSocket connection state.

This post works through each primitive, shows exactly where it breaks, and iterates toward a solution that handles every case we threw at it.

The scenario

Imagine an async Python app managing a connection that moves through states:

txt
disconnected → connecting → connected → closing → closed

One of your concurrent handlers needs to drain pending requests when the connection starts shutting down. It has to wait for the closing state:

state = "disconnected"

async def drain_requests():
    # Need to wait until state == "closing"
    ...
    print("draining pending requests")

Simple enough. Let's see how each stdlib tool handles it.

Attempt 1: Polling

The most obvious approach: check the value in a loop.

async def drain_requests():
    while state != "closing":
        await asyncio.sleep(0.1)
    print("draining pending requests")

This works. But the tradeoffs are bad:

  • Latency vs. efficiency: A short sleep interval wastes CPU cycles. A long one adds latency. There's no good value.
  • Duplication: Every consumer reimplements the same polling loop with the same tradeoff.
  • No event-driven wake: The consumer runs whether or not anything changed.

We can do better. What we actually want is to sleep until the state changes, not sleep for an arbitrary duration and check.

Attempt 2: asyncio.Event

asyncio.Event is the stdlib's answer to "wake me up when something happens":

closing_event = asyncio.Event()

async def drain_requests():
    await closing_event.wait()
    print("draining pending requests")

No polling, no wasted cycles. The handler blocks until the event fires. But Event is boolean: it's either set or unset. Our connection has five states, and drain_requests only cares about one of them. What happens when another handler needs to wait for connected? You need a second event. A third handler waiting for "not disconnected"? A third event with inverted logic. The setter has to know about all of them:

closing_event = asyncio.Event()
connected_event = asyncio.Event()

async def set_state(new_state):
    global state
    state = new_state
    if new_state == "closing":
        closing_event.set()
    if new_state == "connected":
        connected_event.set()

Every new condition requires another Event object. The coordination between events is where bugs live. Forget a set() or clear() call and a consumer blocks forever.

Attempt 3: asyncio.Condition

asyncio.Condition lets consumers wait on arbitrary predicates:

state = "disconnected"
condition = asyncio.Condition()

async def drain_requests():
    async with condition:
        await condition.wait_for(lambda: state == "closing")
    print("draining pending requests")

One coordination point, arbitrary predicates, no proliferation of Event objects. This is much better.

But it breaks under a common pattern.

The lost update

Condition is designed to check the current value when a consumer wakes up. That's fine when state only moves forward, but it falls apart when transitions are fast. When the setter changes state, it calls notify_all(), which schedules wakeups for every waiting consumer. But in a single-threaded event loop, no consumer actually runs until the current coroutine yields. If the value changes again before that happens, consumers wake up and re-evaluate their predicate against the current value, not the value that triggered the notification. The predicate fails and the consumer goes back to sleep, potentially forever.

# Two transitions in quick succession:
await set_state("closing")  # notify_all() schedules wakeups
await set_state("closed")   # state changes again before consumers run

# drain_requests finally wakes, sees "closed", not "closing".
# Pending requests get silently dropped.

Here's a runnable reproduction:

import asyncio

state = "disconnected"
condition = asyncio.Condition()

async def set_state(new_state):
    global state
    async with condition:
        state = new_state
        condition.notify_all()

async def drain_requests():
    async with condition:
        await condition.wait_for(lambda: state == "closing")
    print("draining pending requests")

async def main():
    task = asyncio.create_task(drain_requests())
    await asyncio.sleep(0)  # Let drain_requests start waiting

    await set_state("closing")  # Briefly "closing"...
    await set_state("closed")   # ...then immediately "closed"

    await asyncio.wait_for(task, timeout=1.0)
    # TimeoutError: drain_requests never sees "closing"

asyncio.run(main())

The value was "closing", but by the time drain_requests wakes and checks, it's already "closed". The intermediate state is gone.

This isn't a contrived edge case. In our SDK's connection manager, a close signal can arrive and the connection can shut down in the same event loop tick. drain_requests never runs, and any in-flight work just disappears.

The fix: per-consumer queues

Instead of waking consumers and asking "is the current state what you want?", buffer every transition into a per-consumer queue. Each consumer drains its own queue and checks each transition individually. The consumer never misses a state.

Each consumer registers its own asyncio.Queue. When the value changes, the setter pushes (old, new) into every registered queue. Here's a simplified version that illustrates the core idea:

class ValueWatcher:
    def __init__(self, initial_value):
        self._value = initial_value
        self._watch_queues: list[asyncio.Queue] = []

    @property
    def value(self):
        return self._value

    @value.setter
    def value(self, new_value):
        if new_value == self._value:
            return

        old_value = self._value
        self._value = new_value

        # Notify all consumers
        for queue in self._watch_queues:
            queue.put_nowait((old_value, new_value))

    async def wait_for(self, target):
        queue = asyncio.Queue()
        self._watch_queues.append(queue)

        try:
            if self._value == target:
                return

            while True:
                old, new = await queue.get()
                if new == target:
                    return
        finally:
            self._watch_queues.remove(queue)

wait_for registers a queue, checks the current value, then drains transitions until it finds a match. The try/finally ensures the queue gets deregistered even if the caller cancels.

The queue buffers and delivers every intermediate transition in order, even if the value changes multiple times before a consumer runs.

Making it production-ready

We still need a handful of features to make it production-ready. Our final implementation needs the following:

  • Thread safety: A threading.Lock protects the value and queue list. Each queue is paired with its event loop, and the setter uses loop.call_soon_threadsafe instead of put_nowait directly.
  • Atomic registration: wait_for checks the current value and registers the queue inside the same lock acquisition, closing the race where a transition could slip between registration and the initial check.
  • Full generic typing: Generic[T] end-to-end, so predicates, queues, and return values are all type-checked.
  • Predicate-based matching: wait_for, wait_for_not, and wait_for_not_none all route through a shared _wait_for_condition(predicate) core.
  • Timeouts: Every wait method accepts an optional timeout, backed by asyncio.wait_for.
  • Conditional set: set_if atomically sets the value only when the current value satisfies a predicate, useful for state machine transitions that should only happen from a specific state.
  • Change watching: wait_for_change waits for any transition regardless of value, handy for logging or reacting to state churn.
  • Callback API: on_change and on_value for synchronous consumers alongside the async wait API.
  • Resilient notifications: The setter catches RuntimeError (closed loop) and suppresses callback exceptions so one failure doesn't block other consumers.

The full implementation is about 300 lines, most of which is docstrings and convenience methods built on the same core. Feel free to copy it into your codebase!

Full ValueWatcher source
from __future__ import annotations

import asyncio
import threading
import typing

T = typing.TypeVar("T")

# Used by `wait_for_not_none` to narrow `ValueWatcher[X | None]` to `X`.
S = typing.TypeVar("S")


class ValueWatcher(typing.Generic[T]):
    """
    Thread-safe observable value with async watchers.

    Watchers can await value changes via methods like `wait_for` and
    `wait_for_change`. Alternatively, they can add callbacks via `on_change` and
    `on_value`.

    Any thread can set `.value`, and the watcher will react accordingly.
    """

    def __init__(
        self,
        initial_value: T,
        *,
        on_change: typing.Callable[[T, T], None] | None = None,
    ) -> None:
        """
        Args:
            initial_value: The initial value.
            on_change: Called when the value changes. Good for debug logging.
        """

        self._lock = threading.Lock()
        self._on_changes: list[typing.Callable[[T, T], None]] = []
        if on_change:
            self._on_changes.append(on_change)

        # Every watcher gets its own (loop, queue) pair. Storing the loop lets
        # the setter use `call_soon_threadsafe` for cross-thread notification.
        # Queue items are (old, new) tuples.
        self._watch_queues: list[
            tuple[asyncio.AbstractEventLoop, asyncio.Queue[tuple[T, T]]]
        ] = []

        # Hold references to fire-and-forget tasks to prevent GC.
        self._background_tasks: set[asyncio.Task[T]] = set()

        self._value = initial_value

    @property
    def value(self) -> T:
        with self._lock:
            return self._value

    @value.setter
    def value(self, new_value: T) -> None:
        with self._lock:
            if new_value == self._value:
                return

            old_value = self._value
            self._value = new_value

            # Snapshot lists under lock to avoid iteration issues
            queues = list(self._watch_queues)
            callbacks = list(self._on_changes)

        # Notify all watchers outside the lock to avoid deadlock.
        for loop, queue in queues:
            try:
                # `call_soon_threadsafe` wakes the target loop's selector
                # immediately. A plain `put_nowait` wouldn't poke the self-pipe,
                # so a cross-thread watcher could stall until something else
                # wakes its loop.
                #
                # In other words, without `call_soon_threadsafe`, a watcher
                # could get the changed value notification long after the value
                # actually changed.
                loop.call_soon_threadsafe(
                    queue.put_nowait, (old_value, new_value)
                )
            except RuntimeError:
                # Target event loop is closed.
                pass

        for on_change in callbacks:
            try:
                on_change(old_value, new_value)
            except Exception:
                # Suppress exceptions from callbacks so one failure doesn't skip
                # the rest.
                pass

    def set_if(
        self,
        new_value: T,
        condition: typing.Callable[[T], bool],
    ) -> bool:
        """
        Atomically set the value only if the current value satisfies the
        condition. Returns True if the value was set.
        """

        with self._lock:
            if not condition(self._value):
                return False

            if new_value == self._value:
                return True

            old_value = self._value
            self._value = new_value

            queues = list(self._watch_queues)
            callbacks = list(self._on_changes)

        for loop, queue in queues:
            try:
                loop.call_soon_threadsafe(
                    queue.put_nowait, (old_value, new_value)
                )
            except RuntimeError:
                pass

        for on_change in callbacks:
            try:
                on_change(old_value, new_value)
            except Exception:
                pass

        return True

    def on_change(self, callback: typing.Callable[[T, T], None]) -> None:
        """
        Add a callback that's called when the value changes.

        Args:
            callback: Called with (old_value, new_value) on each change.
        """

        with self._lock:
            self._on_changes.append(callback)

    def on_value(self, value: T, callback: typing.Callable[[], None]) -> None:
        """
        One-shot callback for when the value equals `value`. Requires a
        running event loop (internally spawns a background task).

        Args:
            value: The value to wait for.
            callback: Called when the internal value equals `value`.
        """

        task = asyncio.create_task(self.wait_for(value))
        self._background_tasks.add(task)

        def _done(t: asyncio.Task[T]) -> None:
            self._background_tasks.discard(t)
            if not t.cancelled() and t.exception() is None:
                callback()

        task.add_done_callback(_done)

    async def wait_for(
        self,
        value: T,
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to equal the given value.

        Args:
            value: Return when the internal value is equal to this.
            immediate: If True and the internal value is already equal to the given value, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        return await self._wait_for_condition(
            lambda v: v == value,
            immediate=immediate,
            timeout=timeout,
        )

    async def wait_for_not(
        self,
        value: T,
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to not equal the given value.

        Args:
            value: Return when the internal value is not equal to this.
            immediate: If True and the internal value is already not equal to the given value, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        return await self._wait_for_condition(
            lambda v: v != value,
            immediate=immediate,
            timeout=timeout,
        )

    async def wait_for_not_none(
        self: ValueWatcher[S | None],
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> S:
        """
        Wait for the internal value to be not None.

        Args:
            immediate: If True and the internal value is already not None, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        result = await self._wait_for_condition(
            lambda v: v is not None,
            immediate=immediate,
            timeout=timeout,
        )
        if result is None:
            raise AssertionError("unreachable")
        return result

    async def _wait_for_condition(
        self,
        condition: typing.Callable[[T], bool],
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait until `condition(current_value)` is true, then return the
        matching value. Handles the TOCTOU gap between checking the current
        value and subscribing to the change queue.
        """

        # Fast path: no task needed if the value already matches.
        if immediate:
            # Read once to avoid a TOCTOU race between check and return.
            current = self.value
            if condition(current):
                return current

        async def _wait() -> T:
            with self._watch() as queue:
                # Re-check after queue registration to close the gap
                # between the fast path above and the queue being live.
                if immediate:
                    # Read once to avoid a TOCTOU race between check and return.
                    current = self.value
                    if condition(current):
                        return current

                while True:
                    _, new = await queue.get()
                    if condition(new):
                        return new

        return await asyncio.wait_for(_wait(), timeout=timeout)

    async def wait_for_change(
        self,
        *,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to change.

        Args:
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        async def _wait() -> T:
            with self._watch() as queue:
                _, new = await queue.get()
                return new

        return await asyncio.wait_for(_wait(), timeout=timeout)

    def _watch(self) -> _WatchContextManager[T]:
        """
        Watch for all changes to the value. This method returns a context
        manager so it must be used in a `with` statement.

        Its return value is a queue that yields tuples of the old and new
        values.
        """

        loop = asyncio.get_running_loop()
        queue = asyncio.Queue[tuple[T, T]]()
        with self._lock:
            self._watch_queues.append((loop, queue))

        return _WatchContextManager(
            on_exit=lambda: self._remove_queue(queue),
            queue=queue,
        )

    def _remove_queue(self, queue: asyncio.Queue[tuple[T, T]]) -> None:
        """
        Remove a queue from the watch list in a thread-safe manner.
        """

        with self._lock:
            self._watch_queues = [
                entry for entry in self._watch_queues if entry[1] is not queue
            ]


class _WatchContextManager(typing.Generic[T]):
    """
    Context manager that's used to automatically delete a queue when it's no
    longer being watched.

    Returns a queue that yields tuples of the old and new values.
    """

    def __init__(
        self,
        on_exit: typing.Callable[[], None],
        queue: asyncio.Queue[tuple[T, T]],
    ) -> None:
        self._on_exit = on_exit
        self._queue = queue

    def __enter__(self) -> asyncio.Queue[tuple[T, T]]:
        # IMPORTANT: Do not return an async generator. That can lead to "Task
        # was destroyed but it is pending!" warnings when the event loop closes.
        return self._queue

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: object,
    ) -> None:
        self._on_exit()

wait_for_not_none is particularly useful since we love type safety:

# Wait until the state is anything other than "disconnected"
await state.wait_for_not("disconnected")

# For Optional values: waits until non-None and narrows the type
ws_watcher = ValueWatcher[Connection | None](None)
ws: Connection = await ws_watcher.wait_for_not_none()

One caveat

The setter deduplicates by equality: if the new value == the current value, no notification fires. This works well for enums, strings, and ints, but mutating a mutable object in place and reassigning the same reference won't trigger consumers (because obj == obj is trivially True). Stick to immutable values and this isn't a concern.

Wrapping up

The core insight is simple: asyncio.Condition asks consumers "is the current state what you want?" when it should ask "did the state ever become what you want?" Per-consumer queues make that possible by buffering every transition instead of just notifying about the latest one.

We use ValueWatcher throughout Inngest's Python SDK to coordinate WebSocket connection state, worker lifecycle, and graceful shutdown. If you're managing shared mutable state in asyncio, give it a try.