asyncio Queues (Worker Pools)
On this page
Queues Provide Backpressure
Queues let producers and consumers run at different speeds without blowing up memory. They are a practical way to control concurrency.
Worker Pool with Queue
import asyncio
async def worker(q: asyncio.Queue):
while True:
item = await q.get()
try:
await asyncio.sleep(0.01)
finally:
q.task_done()
async def main():
q = asyncio.Queue(maxsize=100)
workers = [asyncio.create_task(worker(q)) for _ in range(5)]
for i in range(1000):
await q.put(i)
await q.join()
for w in workers:
w.cancel()
asyncio.run(main())
Operational Checklist
- Set
maxsizeto enforce backpressure. - Always call
task_done()in afinallyblock. - Cancel workers cleanly on shutdown.
Failure Modes
- Unbounded queue: memory grows until OOM under bursts.
- Missing task_done:
join()hangs forever.