asyncio β Async I/O#
What it is#
asyncio is Pythonβs built-in library for writing single-threaded concurrent code using async/await syntax. Instead of threads or processes, it uses a cooperative event loop: a coroutine yields control when it waits for I/O (await), allowing other coroutines to run in the meantime.
It is the foundation for httpx async mode, FastAPI, and every other async Python library.
Quick example#
import asyncio
async def greet(name: str, delay: float) -> str:
await asyncio.sleep(delay) # non-blocking sleep
msg = f"Hello, {name}!"
print(msg)
return msg
asyncio.run(greet("Alice", 0.1)) # entry point for async code
Output:
Hello, Alice!
When / why to use it#
- I/O-bound work: many simultaneous network requests, database queries, or file reads.
- Building async web servers (FastAPI, aiohttp, Starlette) or async CLI tools.
- When threads add too much overhead or complexity for your use case.
Asyncio does not help CPU-bound code (use multiprocessing or concurrent.futures.ProcessPoolExecutor for that).
Common pitfalls#
[!WARNING] Calling blocking functions inside async code freezes the event loop. Anything that blocks a thread β
time.sleep(), synchronousrequests.get(),open()on a slow NFS mount β stops all coroutines until it returns. Use async equivalents (asyncio.sleep,httpx.AsyncClient,aiofiles) or run blocking calls in a thread pool:await asyncio.to_thread(blocking_fn, arg).
[!WARNING] Forgetting
awaitβ calling a coroutine withoutawaitreturns a coroutine object, not a result, and does nothing. mypy/pyright will catch this if you annotate return types.
[!TIP]
asyncio.run()is the correct entry point for scripts. Never call it inside another async function β useawaitinside async functions, and reserveasyncio.run()for the outermost call.
Concurrent execution with gather#
asyncio.gather() runs coroutines concurrently and returns all results when the last one finishes.
import asyncio
import time
async def fetch(n: int, delay: float) -> str:
await asyncio.sleep(delay)
return f"result-{n}"
async def main():
start = time.perf_counter()
results = await asyncio.gather(
fetch(1, 0.3),
fetch(2, 0.2),
fetch(3, 0.1),
)
elapsed = time.perf_counter() - start
print(results)
print(f"Done in {elapsed:.2f}s (sequential would take ~0.60s)")
asyncio.run(main())
Output:
['result-1', 'result-2', 'result-3']
Done in 0.30s (sequential would take ~0.60s)
create_task β fire and forget#
create_task schedules a coroutine as a background task without awaiting it immediately.
import asyncio
async def background_job(name: str):
await asyncio.sleep(0.1)
print(f"[bg] {name} done")
async def main():
task = asyncio.create_task(background_job("cleanup"))
print("Main work happening...")
await asyncio.sleep(0.05)
print("More main work...")
await task # wait for it before exiting
asyncio.run(main())
Output:
Main work happening...
More main work...
[bg] cleanup done
[!WARNING] If you create a task but never await it (or add it to a gather), the event loop may cancel it silently when the surrounding coroutine finishes. Always hold a reference and eventually await or cancel tasks.
Timeout#
import asyncio
async def slow_op():
await asyncio.sleep(5)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_op(), timeout=1.0)
except asyncio.TimeoutError:
print("Timed out after 1s")
asyncio.run(main())
Output:
Timed out after 1s
Run blocking code in a thread#
import asyncio
import time
def cpu_or_blocking_work(n: int) -> int:
time.sleep(0.1) # simulates blocking I/O or CPU work
return n * n
async def main():
# asyncio.to_thread wraps a sync function so it doesn't block the loop
results = await asyncio.gather(*[
asyncio.to_thread(cpu_or_blocking_work, i) for i in range(5)
])
print(results)
asyncio.run(main())
Output:
[0, 1, 4, 9, 16]
Producer / consumer with Queue#
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.05)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f" Consumed: {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=2)
await asyncio.gather(
producer(queue, ["a", "b", "c"]),
consumer(queue),
)
asyncio.run(main())
Output:
Produced: a
Consumed: a
Produced: b
Consumed: b
Produced: c
Consumed: c
Quick reference#
| Pattern | Code |
|---|---|
| Entry point | asyncio.run(main()) |
| Sleep | await asyncio.sleep(seconds) |
| Run concurrently | await asyncio.gather(coro1(), coro2()) |
| Background task | t = asyncio.create_task(coro()) |
| Timeout | await asyncio.wait_for(coro(), timeout=5) |
| Run blocking fn | await asyncio.to_thread(fn, arg) |
| Current event loop | asyncio.get_event_loop() |
| Queue | asyncio.Queue(maxsize=N) |
| Semaphore (limit concurrency) | asyncio.Semaphore(10) |