skip to content

asyncio β€” Async I/O

Write concurrent Python code with asyncio. Covers coroutines, asyncio.run, gather, create_task, timeouts, queues, and avoiding the blocking-call pitfall.

4 min read 12 snippets yesterday intermediate

asyncio β€” Async I/O#

What it is#

asyncio is Python’s built-in library for writing single-threaded concurrent code using async/await syntax. Instead of threads or processes, it uses a cooperative event loop: a coroutine yields control when it waits for I/O (await), allowing other coroutines to run in the meantime.

It is the foundation for httpx async mode, FastAPI, and every other async Python library.

Quick example#

import asyncio

async def greet(name: str, delay: float) -> str:
    await asyncio.sleep(delay)      # non-blocking sleep
    msg = f"Hello, {name}!"
    print(msg)
    return msg

asyncio.run(greet("Alice", 0.1))   # entry point for async code

Output:

Hello, Alice!

When / why to use it#

  • I/O-bound work: many simultaneous network requests, database queries, or file reads.
  • Building async web servers (FastAPI, aiohttp, Starlette) or async CLI tools.
  • When threads add too much overhead or complexity for your use case.

Asyncio does not help CPU-bound code (use multiprocessing or concurrent.futures.ProcessPoolExecutor for that).

Common pitfalls#

[!WARNING] Calling blocking functions inside async code freezes the event loop. Anything that blocks a thread β€” time.sleep(), synchronous requests.get(), open() on a slow NFS mount β€” stops all coroutines until it returns. Use async equivalents (asyncio.sleep, httpx.AsyncClient, aiofiles) or run blocking calls in a thread pool: await asyncio.to_thread(blocking_fn, arg).

[!WARNING] Forgetting await β€” calling a coroutine without await returns a coroutine object, not a result, and does nothing. mypy/pyright will catch this if you annotate return types.

[!TIP] asyncio.run() is the correct entry point for scripts. Never call it inside another async function β€” use await inside async functions, and reserve asyncio.run() for the outermost call.

Concurrent execution with gather#

asyncio.gather() runs coroutines concurrently and returns all results when the last one finishes.

import asyncio
import time

async def fetch(n: int, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"result-{n}"

async def main():
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch(1, 0.3),
        fetch(2, 0.2),
        fetch(3, 0.1),
    )
    elapsed = time.perf_counter() - start
    print(results)
    print(f"Done in {elapsed:.2f}s  (sequential would take ~0.60s)")

asyncio.run(main())

Output:

['result-1', 'result-2', 'result-3']
Done in 0.30s  (sequential would take ~0.60s)

create_task β€” fire and forget#

create_task schedules a coroutine as a background task without awaiting it immediately.

import asyncio

async def background_job(name: str):
    await asyncio.sleep(0.1)
    print(f"[bg] {name} done")

async def main():
    task = asyncio.create_task(background_job("cleanup"))
    print("Main work happening...")
    await asyncio.sleep(0.05)
    print("More main work...")
    await task          # wait for it before exiting

asyncio.run(main())

Output:

Main work happening...
More main work...
[bg] cleanup done

[!WARNING] If you create a task but never await it (or add it to a gather), the event loop may cancel it silently when the surrounding coroutine finishes. Always hold a reference and eventually await or cancel tasks.

Timeout#

import asyncio

async def slow_op():
    await asyncio.sleep(5)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_op(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out after 1s")

asyncio.run(main())

Output:

Timed out after 1s

Run blocking code in a thread#

import asyncio
import time

def cpu_or_blocking_work(n: int) -> int:
    time.sleep(0.1)     # simulates blocking I/O or CPU work
    return n * n

async def main():
    # asyncio.to_thread wraps a sync function so it doesn't block the loop
    results = await asyncio.gather(*[
        asyncio.to_thread(cpu_or_blocking_work, i) for i in range(5)
    ])
    print(results)

asyncio.run(main())

Output:

[0, 1, 4, 9, 16]

Producer / consumer with Queue#

import asyncio

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.05)
    await queue.put(None)   # sentinel

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"  Consumed: {item}")
        queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=2)
    await asyncio.gather(
        producer(queue, ["a", "b", "c"]),
        consumer(queue),
    )

asyncio.run(main())

Output:

Produced: a
  Consumed: a
Produced: b
  Consumed: b
Produced: c
  Consumed: c

Quick reference#

PatternCode
Entry pointasyncio.run(main())
Sleepawait asyncio.sleep(seconds)
Run concurrentlyawait asyncio.gather(coro1(), coro2())
Background taskt = asyncio.create_task(coro())
Timeoutawait asyncio.wait_for(coro(), timeout=5)
Run blocking fnawait asyncio.to_thread(fn, arg)
Current event loopasyncio.get_event_loop()
Queueasyncio.Queue(maxsize=N)
Semaphore (limit concurrency)asyncio.Semaphore(10)