
What Python's asyncio primitives get wrong about shared state
We tried Event, Condition, and Queue. Each one gets closer but still breaks under real concurrency. Here's the observable pattern that finally works.
Aaron Harper· 3/4/2026 · 13 min read
Coordinating concurrent tasks around shared state is one of the most common problems in Python's asyncio. The standard library gives you asyncio.Event and asyncio.Condition, but each has a gap that only shows up under real concurrency pressure. We hit this while building Inngest's Python SDK, where multiple async handlers coordinate around WebSocket connection state.
This post works through each primitive, shows exactly where it breaks, and iterates toward a solution that handles every case we threw at it.
The scenario
Imagine an async Python app managing a connection that moves through states:
disconnected → connecting → connected → closing → closed
One of your concurrent handlers needs to drain pending requests when the connection starts shutting down. It has to wait for the closing state:
state = "disconnected"
async def drain_requests():
# Need to wait until state == "closing"
...
print("draining pending requests")
Simple enough. Let's see how each stdlib tool handles it.
Attempt 1: Polling
The most obvious approach: check the value in a loop.
async def drain_requests():
while state != "closing":
await asyncio.sleep(0.1)
print("draining pending requests")
This works. But the tradeoffs are bad:
- Latency vs. efficiency: A short sleep interval wastes CPU cycles. A long one adds latency. There's no good value.
- Duplication: Every consumer reimplements the same polling loop with the same tradeoff.
- No event-driven wake: The consumer runs whether or not anything changed.
We can do better. What we actually want is to sleep until the state changes, not sleep for an arbitrary duration and check.
Attempt 2: asyncio.Event
asyncio.Event is the stdlib's answer to "wake me up when something happens":
closing_event = asyncio.Event()
async def drain_requests():
await closing_event.wait()
print("draining pending requests")
No polling, no wasted cycles. The handler blocks until the event fires. But Event is boolean: it's either set or unset. Our connection has five states, and drain_requests only cares about one of them. What happens when another handler needs to wait for connected? You need a second event. A third handler waiting for "not disconnected"? A third event with inverted logic. The setter has to know about all of them:
closing_event = asyncio.Event()
connected_event = asyncio.Event()
async def set_state(new_state):
global state
state = new_state
if new_state == "closing":
closing_event.set()
if new_state == "connected":
connected_event.set()
Every new condition requires another Event object. The coordination between events is where bugs live. Forget a set() or clear() call and a consumer blocks forever.
Attempt 3: asyncio.Condition
asyncio.Condition lets consumers wait on arbitrary predicates:
state = "disconnected"
condition = asyncio.Condition()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
One coordination point, arbitrary predicates, no proliferation of Event objects. This is much better.
But it breaks under a common pattern.
The lost update
Condition is designed to check the current value when a consumer wakes up. That's fine when state only moves forward, but it falls apart when transitions are fast. When the setter changes state, it calls notify_all(), which schedules wakeups for every waiting consumer. But in a single-threaded event loop, no consumer actually runs until the current coroutine yields. If the value changes again before that happens, consumers wake up and re-evaluate their predicate against the current value, not the value that triggered the notification. The predicate fails and the consumer goes back to sleep, potentially forever.
# Two transitions in quick succession:
await set_state("closing") # notify_all() schedules wakeups
await set_state("closed") # state changes again before consumers run
# drain_requests finally wakes, sees "closed", not "closing".
# Pending requests get silently dropped.
Here's a runnable reproduction:
import asyncio
state = "disconnected"
condition = asyncio.Condition()
async def set_state(new_state):
global state
async with condition:
state = new_state
condition.notify_all()
async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == "closing")
print("draining pending requests")
async def main():
task = asyncio.create_task(drain_requests())
await asyncio.sleep(0) # Let drain_requests start waiting
await set_state("closing") # Briefly "closing"...
await set_state("closed") # ...then immediately "closed"
await asyncio.wait_for(task, timeout=1.0)
# TimeoutError: drain_requests never sees "closing"
asyncio.run(main())
The value was "closing", but by the time drain_requests wakes and checks, it's already "closed". The intermediate state is gone.
This isn't a contrived edge case. In our SDK's connection manager, a close signal can arrive and the connection can shut down in the same event loop tick. drain_requests never runs, and any in-flight work just disappears.
The fix: per-consumer queues
Instead of waking consumers and asking "is the current state what you want?", buffer every transition into a per-consumer queue. Each consumer drains its own queue and checks each transition individually. The consumer never misses a state.
Each consumer registers its own asyncio.Queue. When the value changes, the setter pushes (old, new) into every registered queue. Here's a simplified version that illustrates the core idea:
class ValueWatcher:
def __init__(self, initial_value):
self._value = initial_value
self._watch_queues: list[asyncio.Queue] = []
@property
def value(self):
return self._value
@value.setter
def value(self, new_value):
if new_value == self._value:
return
old_value = self._value
self._value = new_value
# Notify all consumers
for queue in self._watch_queues:
queue.put_nowait((old_value, new_value))
async def wait_for(self, target):
queue = asyncio.Queue()
self._watch_queues.append(queue)
try:
if self._value == target:
return
while True:
old, new = await queue.get()
if new == target:
return
finally:
self._watch_queues.remove(queue)
wait_for registers a queue, checks the current value, then drains transitions until it finds a match. The try/finally ensures the queue gets deregistered even if the caller cancels.
The queue buffers and delivers every intermediate transition in order, even if the value changes multiple times before a consumer runs.
Making it production-ready
We still need a handful of features to make it production-ready. Our final implementation needs the following:
- Thread safety: A
threading.Lockprotects the value and queue list. Each queue is paired with its event loop, and the setter usesloop.call_soon_threadsafeinstead ofput_nowaitdirectly. - Atomic registration:
wait_forchecks the current value and registers the queue inside the same lock acquisition, closing the race where a transition could slip between registration and the initial check. - Full generic typing:
Generic[T]end-to-end, so predicates, queues, and return values are all type-checked. - Predicate-based matching:
wait_for,wait_for_not, andwait_for_not_noneall route through a shared_wait_for_condition(predicate)core. - Timeouts: Every wait method accepts an optional
timeout, backed byasyncio.wait_for. - Conditional set:
set_ifatomically sets the value only when the current value satisfies a predicate, useful for state machine transitions that should only happen from a specific state. - Change watching:
wait_for_changewaits for any transition regardless of value, handy for logging or reacting to state churn. - Callback API:
on_changeandon_valuefor synchronous consumers alongside the async wait API. - Resilient notifications: The setter catches
RuntimeError(closed loop) and suppresses callback exceptions so one failure doesn't block other consumers.
The full implementation is about 300 lines, most of which is docstrings and convenience methods built on the same core. Feel free to copy it into your codebase!
Full ValueWatcher source
from __future__ import annotations
import asyncio
import threading
import typing
T = typing.TypeVar("T")
# Used by `wait_for_not_none` to narrow `ValueWatcher[X | None]` to `X`.
S = typing.TypeVar("S")
class ValueWatcher(typing.Generic[T]):
"""
Thread-safe observable value with async watchers.
Watchers can await value changes via methods like `wait_for` and
`wait_for_change`. Alternatively, they can add callbacks via `on_change` and
`on_value`.
Any thread can set `.value`, and the watcher will react accordingly.
"""
def __init__(
self,
initial_value: T,
*,
on_change: typing.Callable[[T, T], None] | None = None,
) -> None:
"""
Args:
initial_value: The initial value.
on_change: Called when the value changes. Good for debug logging.
"""
self._lock = threading.Lock()
self._on_changes: list[typing.Callable[[T, T], None]] = []
if on_change:
self._on_changes.append(on_change)
# Every watcher gets its own (loop, queue) pair. Storing the loop lets
# the setter use `call_soon_threadsafe` for cross-thread notification.
# Queue items are (old, new) tuples.
self._watch_queues: list[
tuple[asyncio.AbstractEventLoop, asyncio.Queue[tuple[T, T]]]
] = []
# Hold references to fire-and-forget tasks to prevent GC.
self._background_tasks: set[asyncio.Task[T]] = set()
self._value = initial_value
@property
def value(self) -> T:
with self._lock:
return self._value
@value.setter
def value(self, new_value: T) -> None:
with self._lock:
if new_value == self._value:
return
old_value = self._value
self._value = new_value
# Snapshot lists under lock to avoid iteration issues
queues = list(self._watch_queues)
callbacks = list(self._on_changes)
# Notify all watchers outside the lock to avoid deadlock.
for loop, queue in queues:
try:
# `call_soon_threadsafe` wakes the target loop's selector
# immediately. A plain `put_nowait` wouldn't poke the self-pipe,
# so a cross-thread watcher could stall until something else
# wakes its loop.
#
# In other words, without `call_soon_threadsafe`, a watcher
# could get the changed value notification long after the value
# actually changed.
loop.call_soon_threadsafe(
queue.put_nowait, (old_value, new_value)
)
except RuntimeError:
# Target event loop is closed.
pass
for on_change in callbacks:
try:
on_change(old_value, new_value)
except Exception:
# Suppress exceptions from callbacks so one failure doesn't skip
# the rest.
pass
def set_if(
self,
new_value: T,
condition: typing.Callable[[T], bool],
) -> bool:
"""
Atomically set the value only if the current value satisfies the
condition. Returns True if the value was set.
"""
with self._lock:
if not condition(self._value):
return False
if new_value == self._value:
return True
old_value = self._value
self._value = new_value
queues = list(self._watch_queues)
callbacks = list(self._on_changes)
for loop, queue in queues:
try:
loop.call_soon_threadsafe(
queue.put_nowait, (old_value, new_value)
)
except RuntimeError:
pass
for on_change in callbacks:
try:
on_change(old_value, new_value)
except Exception:
pass
return True
def on_change(self, callback: typing.Callable[[T, T], None]) -> None:
"""
Add a callback that's called when the value changes.
Args:
callback: Called with (old_value, new_value) on each change.
"""
with self._lock:
self._on_changes.append(callback)
def on_value(self, value: T, callback: typing.Callable[[], None]) -> None:
"""
One-shot callback for when the value equals `value`. Requires a
running event loop (internally spawns a background task).
Args:
value: The value to wait for.
callback: Called when the internal value equals `value`.
"""
task = asyncio.create_task(self.wait_for(value))
self._background_tasks.add(task)
def _done(t: asyncio.Task[T]) -> None:
self._background_tasks.discard(t)
if not t.cancelled() and t.exception() is None:
callback()
task.add_done_callback(_done)
async def wait_for(
self,
value: T,
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to equal the given value.
Args:
value: Return when the internal value is equal to this.
immediate: If True and the internal value is already equal to the given value, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
return await self._wait_for_condition(
lambda v: v == value,
immediate=immediate,
timeout=timeout,
)
async def wait_for_not(
self,
value: T,
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to not equal the given value.
Args:
value: Return when the internal value is not equal to this.
immediate: If True and the internal value is already not equal to the given value, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
return await self._wait_for_condition(
lambda v: v != value,
immediate=immediate,
timeout=timeout,
)
async def wait_for_not_none(
self: ValueWatcher[S | None],
*,
immediate: bool = True,
timeout: float | None = None,
) -> S:
"""
Wait for the internal value to be not None.
Args:
immediate: If True and the internal value is already not None, return immediately. Defaults to True.
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
result = await self._wait_for_condition(
lambda v: v is not None,
immediate=immediate,
timeout=timeout,
)
if result is None:
raise AssertionError("unreachable")
return result
async def _wait_for_condition(
self,
condition: typing.Callable[[T], bool],
*,
immediate: bool = True,
timeout: float | None = None,
) -> T:
"""
Wait until `condition(current_value)` is true, then return the
matching value. Handles the TOCTOU gap between checking the current
value and subscribing to the change queue.
"""
# Fast path: no task needed if the value already matches.
if immediate:
# Read once to avoid a TOCTOU race between check and return.
current = self.value
if condition(current):
return current
async def _wait() -> T:
with self._watch() as queue:
# Re-check after queue registration to close the gap
# between the fast path above and the queue being live.
if immediate:
# Read once to avoid a TOCTOU race between check and return.
current = self.value
if condition(current):
return current
while True:
_, new = await queue.get()
if condition(new):
return new
return await asyncio.wait_for(_wait(), timeout=timeout)
async def wait_for_change(
self,
*,
timeout: float | None = None,
) -> T:
"""
Wait for the internal value to change.
Args:
timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
"""
async def _wait() -> T:
with self._watch() as queue:
_, new = await queue.get()
return new
return await asyncio.wait_for(_wait(), timeout=timeout)
def _watch(self) -> _WatchContextManager[T]:
"""
Watch for all changes to the value. This method returns a context
manager so it must be used in a `with` statement.
Its return value is a queue that yields tuples of the old and new
values.
"""
loop = asyncio.get_running_loop()
queue = asyncio.Queue[tuple[T, T]]()
with self._lock:
self._watch_queues.append((loop, queue))
return _WatchContextManager(
on_exit=lambda: self._remove_queue(queue),
queue=queue,
)
def _remove_queue(self, queue: asyncio.Queue[tuple[T, T]]) -> None:
"""
Remove a queue from the watch list in a thread-safe manner.
"""
with self._lock:
self._watch_queues = [
entry for entry in self._watch_queues if entry[1] is not queue
]
class _WatchContextManager(typing.Generic[T]):
"""
Context manager that's used to automatically delete a queue when it's no
longer being watched.
Returns a queue that yields tuples of the old and new values.
"""
def __init__(
self,
on_exit: typing.Callable[[], None],
queue: asyncio.Queue[tuple[T, T]],
) -> None:
self._on_exit = on_exit
self._queue = queue
def __enter__(self) -> asyncio.Queue[tuple[T, T]]:
# IMPORTANT: Do not return an async generator. That can lead to "Task
# was destroyed but it is pending!" warnings when the event loop closes.
return self._queue
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: object,
) -> None:
self._on_exit()
wait_for_not_none is particularly useful since we love type safety:
# Wait until the state is anything other than "disconnected"
await state.wait_for_not("disconnected")
# For Optional values: waits until non-None and narrows the type
ws_watcher = ValueWatcher[Connection | None](None)
ws: Connection = await ws_watcher.wait_for_not_none()
One caveat
The setter deduplicates by equality: if the new value == the current value, no notification fires. This works well for enums, strings, and ints, but mutating a mutable object in place and reassigning the same reference won't trigger consumers (because obj == obj is trivially True). Stick to immutable values and this isn't a concern.
Wrapping up
The core insight is simple: asyncio.Condition asks consumers "is the current state what you want?" when it should ask "did the state ever become what you want?" Per-consumer queues make that possible by buffering every transition instead of just notifying about the latest one.
We use ValueWatcher throughout Inngest's Python SDK to coordinate WebSocket connection state, worker lifecycle, and graceful shutdown. If you're managing shared mutable state in asyncio, give it a try.