Asyncio enables concurrent programming in Python. Here are the patterns you'll use constantly.

Basic Concurrency with gather

Run multiple coroutines concurrently:

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(0.1)  # Simulate IO
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run concurrently, wait for all
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    print(results)  # [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]
 
asyncio.run(main())

TaskGroup (Python 3.11+)

Structured concurrency with automatic cleanup:

import asyncio
 
async def process_item(item):
    await asyncio.sleep(0.1)
    if item == "bad":
        raise ValueError("Bad item")
    return item.upper()
 
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(process_item("hello"))
        task2 = tg.create_task(process_item("world"))
    
    # All tasks complete or all cancelled on error
    print(task1.result(), task2.result())
 
asyncio.run(main())

Timeout Patterns

import asyncio
 
async def slow_operation():
    await asyncio.sleep(10)
    return "done"
 
async def main():
    # Method 1: wait_for
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Operation timed out")
    
    # Method 2: timeout context (Python 3.11+)
    async with asyncio.timeout(2.0):
        result = await slow_operation()
 
asyncio.run(main())

Semaphore for Rate Limiting

Limit concurrent operations:

import asyncio
 
async def fetch_with_limit(semaphore, url):
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(0.5)  # Simulate request
        return f"Response from {url}"
 
async def main():
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent
    
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    tasks = [fetch_with_limit(semaphore, url) for url in urls]
    results = await asyncio.gather(*tasks)
 
asyncio.run(main())

Queue-Based Producer/Consumer

import asyncio
 
async def producer(queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    
    # Signal completion
    await queue.put(None)
 
async def consumer(queue):
    results = []
    while True:
        item = await queue.get()
        if item is None:
            break
        
        # Process item
        await asyncio.sleep(0.1)
        results.append(item.upper())
        queue.task_done()
    
    return results
 
async def main():
    queue = asyncio.Queue(maxsize=5)
    
    items = ["apple", "banana", "cherry"]
    
    producer_task = asyncio.create_task(producer(queue, items))
    consumer_task = asyncio.create_task(consumer(queue))
    
    await producer_task
    results = await consumer_task
    print(results)
 
asyncio.run(main())

Error Handling with gather

import asyncio
 
async def might_fail(x):
    if x == 2:
        raise ValueError("x cannot be 2")
    return x * 2
 
async def main():
    # return_exceptions=True prevents cancellation
    results = await asyncio.gather(
        might_fail(1),
        might_fail(2),
        might_fail(3),
        return_exceptions=True
    )
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")
 
asyncio.run(main())

Event Coordination

import asyncio
 
async def waiter(event, name):
    print(f"{name} waiting...")
    await event.wait()
    print(f"{name} proceeding!")
 
async def setter(event):
    await asyncio.sleep(1)
    print("Setting event")
    event.set()
 
async def main():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "Task A"),
        waiter(event, "Task B"),
        setter(event)
    )
 
asyncio.run(main())

Lock for Shared State

import asyncio
 
class Counter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:
            current = self.value
            await asyncio.sleep(0.01)  # Simulate work
            self.value = current + 1
 
async def main():
    counter = Counter()
    
    # Run 100 increments concurrently
    await asyncio.gather(*[counter.increment() for _ in range(100)])
    
    print(counter.value)  # 100 (without lock, could be less)
 
asyncio.run(main())

Periodic Tasks

import asyncio
 
async def periodic_task(interval, callback):
    while True:
        await callback()
        await asyncio.sleep(interval)
 
async def check_health():
    print("Health check running...")
 
async def main():
    # Create periodic task
    task = asyncio.create_task(
        periodic_task(5.0, check_health)
    )
    
    # Let it run for a while
    await asyncio.sleep(12)
    task.cancel()
 
asyncio.run(main())

First Completed Pattern

import asyncio
 
async def fetch_from_server(server, delay):
    await asyncio.sleep(delay)
    return f"Response from {server}"
 
async def main():
    tasks = {
        asyncio.create_task(fetch_from_server("server1", 1.0)),
        asyncio.create_task(fetch_from_server("server2", 0.5)),
        asyncio.create_task(fetch_from_server("server3", 2.0)),
    }
    
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Get first result
    winner = done.pop()
    print(f"First: {winner.result()}")
    
    # Cancel the rest
    for task in pending:
        task.cancel()
 
asyncio.run(main())

Async Context Managers

import asyncio
 
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Disconnecting...")
        await asyncio.sleep(0.1)
    
    async def query(self, sql):
        await asyncio.sleep(0.1)
        return [{"id": 1}]
 
async def main():
    async with AsyncDatabase() as db:
        results = await db.query("SELECT * FROM users")
        print(results)
 
asyncio.run(main())

Running Sync Code in Threads

import asyncio
import time
 
def blocking_io():
    time.sleep(1)  # Can't await this
    return "result"
 
async def main():
    loop = asyncio.get_running_loop()
    
    # Run in thread pool
    result = await loop.run_in_executor(None, blocking_io)
    print(result)
 
asyncio.run(main())

These patterns cover 90% of asyncio use cases. Master them and concurrent Python becomes straightforward.

React to this post: