Async Python: asyncio, aiohttp, and Concurrent Patterns

Asynchronous programming in Python is not about running things in parallel. It is about doing other work while you wait. When your code is blocked waiting for a network response or a database query, async lets another task run in that gap. Understanding this distinction is the key to using asyncio correctly.

The Event Loop and Coroutines

asyncio is built around an event loop, a single-threaded scheduler that runs coroutines cooperatively. A coroutine is a function defined with async def. It does not run until awaited.

import asyncio

async def greet(name: str) -> str:
    await asyncio.sleep(1)   # non-blocking wait — yields control to the loop
    return f"Hello, {name}"

async def main():
    result = await greet("Muneer")
    print(result)

asyncio.run(main())  # always use asyncio.run() as the entry point

asyncio.sleep() suspends the coroutine and gives the event loop a chance to run other tasks, unlike time.sleep(), which blocks the entire thread.

Running Tasks Concurrently

The most important asyncio primitive is asyncio.gather(). It runs multiple coroutines concurrently and returns their results when all complete.

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
    ]
    results = await fetch_all(urls)
    print(f"Fetched {len(results)} posts concurrently")

asyncio.run(main())

Without async, three sequential HTTP requests at 200ms each would take 600ms. With gather, they take ~200ms total.

asyncio.create_task(): Fire and Forget

When you want a coroutine to start running immediately without waiting for it:

async def background_job(item_id: int) -> None:
    await asyncio.sleep(2)
    print(f"Processed {item_id}")

async def main():
    # Create tasks — they start running immediately
    task1 = asyncio.create_task(background_job(1))
    task2 = asyncio.create_task(background_job(2))

    print("Tasks created, doing other work...")
    await asyncio.sleep(0.1)

    # Wait for both to finish
    await task1
    await task2

Important: Always keep a reference to a task. If a task is garbage-collected before it finishes, it will be cancelled silently.

Error Handling with gather

By default, if one task in gather raises an exception, the others are cancelled. Use return_exceptions=True to get all results (including exceptions) and handle them yourself.

async def risky_fetch(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as r:
            r.raise_for_status()
            return await r.json()

async def main():
    urls = ["https://valid.example.com/api", "https://invalid-url-xyz.com"]

    results = await asyncio.gather(
        *[risky_fetch(url) for url in urls],
        return_exceptions=True
    )

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"Failed {url}: {result}")
        else:
            print(f"Success {url}: {result}")

Limiting Concurrency with asyncio.Semaphore

Running 1000 requests concurrently will overwhelm most servers and get you rate-limited or banned. Use a Semaphore to cap concurrent operations.

async def fetch_with_limit(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: asyncio.Semaphore,
) -> dict:
    async with semaphore:   # only N tasks pass this gate at a time
        async with session.get(url) as response:
            return await response.json()

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    semaphore = asyncio.Semaphore(10)  # max 10 concurrent requests

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

Async Context Managers and Iterators

You can make your own classes work with async with and async for:

class AsyncDatabaseConnection:
    async def __aenter__(self):
        self.conn = await create_connection()
        return self.conn

    async def __aexit__(self, *args):
        await self.conn.close()

async def get_articles():
    async with AsyncDatabaseConnection() as conn:
        return await conn.fetch("SELECT * FROM articles")

# Async generator for streaming results
async def stream_articles(conn):
    async for row in conn.cursor("SELECT * FROM articles"):
        yield row

async def main():
    async with AsyncDatabaseConnection() as conn:
        async for article in stream_articles(conn):
            process(article)

Common Pitfalls

1. Mixing sync blocking calls into async code

import time

async def bad_sleep():
    time.sleep(2)       # WRONG — blocks the entire event loop

async def good_sleep():
    await asyncio.sleep(2)  # correct — yields to event loop

If you must call a blocking function (CPU-heavy computation, sync library), run it in an executor:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_compute(n: int) -> int:
    return sum(i ** 2 for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_compute, 1_000_000)
    print(result)

2. Creating a new ClientSession per request

# Bad — opens and closes a connection pool on every call
async def bad_fetch(url: str):
    async with aiohttp.ClientSession() as session:  # expensive!
        async with session.get(url) as r:
            return await r.json()

# Good — share one session across all requests
async def main():
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            good_fetch(session, url1),
            good_fetch(session, url2),
        )

3. Forgetting to await

Calling a coroutine without await does nothing. Python will warn you, but the code silently skips the operation.

async def save(data): ...

# Bug — save() is never actually called
save(data)

# Correct
await save(data)

When NOT to Use asyncio

  • CPU-bound work: asyncio does not help with computation. Use multiprocessing or concurrent.futures.ProcessPoolExecutor.
  • Simple scripts: If you're writing a script that makes one API call, plain requests is fine. async adds complexity that only pays off with concurrency.
  • Already threaded libraries: Libraries like SQLAlchemy (sync) and Celery have their own concurrency models. Mixing async into them needs care.

Conclusion

Async Python excels at I/O-bound tasks: HTTP calls, database queries, file reads, websocket connections. The key primitives are asyncio.gather for concurrent tasks, Semaphore for rate limiting, and aiohttp.ClientSession for HTTP. Avoid blocking calls inside async functions and keep one shared session per application lifetime.


References: