Asyncio enables concurrent programming in Python. Here are the patterns you'll use constantly.
Basic Concurrency with gather
Run multiple coroutines concurrently:
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(0.1) # Simulate IO
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run concurrently, wait for all
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
print(results) # [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]
asyncio.run(main())TaskGroup (Python 3.11+)
Structured concurrency with automatic cleanup:
import asyncio
async def process_item(item):
await asyncio.sleep(0.1)
if item == "bad":
raise ValueError("Bad item")
return item.upper()
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(process_item("hello"))
task2 = tg.create_task(process_item("world"))
# All tasks complete or all cancelled on error
print(task1.result(), task2.result())
asyncio.run(main())Timeout Patterns
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# Method 1: wait_for
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out")
# Method 2: timeout context (Python 3.11+)
async with asyncio.timeout(2.0):
result = await slow_operation()
asyncio.run(main())Semaphore for Rate Limiting
Limit concurrent operations:
import asyncio
async def fetch_with_limit(semaphore, url):
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(0.5) # Simulate request
return f"Response from {url}"
async def main():
semaphore = asyncio.Semaphore(3) # Max 3 concurrent
urls = [f"https://api.example.com/{i}" for i in range(10)]
tasks = [fetch_with_limit(semaphore, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())Queue-Based Producer/Consumer
import asyncio
async def producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Signal completion
await queue.put(None)
async def consumer(queue):
results = []
while True:
item = await queue.get()
if item is None:
break
# Process item
await asyncio.sleep(0.1)
results.append(item.upper())
queue.task_done()
return results
async def main():
queue = asyncio.Queue(maxsize=5)
items = ["apple", "banana", "cherry"]
producer_task = asyncio.create_task(producer(queue, items))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
results = await consumer_task
print(results)
asyncio.run(main())Error Handling with gather
import asyncio
async def might_fail(x):
if x == 2:
raise ValueError("x cannot be 2")
return x * 2
async def main():
# return_exceptions=True prevents cancellation
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
asyncio.run(main())Event Coordination
import asyncio
async def waiter(event, name):
print(f"{name} waiting...")
await event.wait()
print(f"{name} proceeding!")
async def setter(event):
await asyncio.sleep(1)
print("Setting event")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "Task A"),
waiter(event, "Task B"),
setter(event)
)
asyncio.run(main())Lock for Shared State
import asyncio
class Counter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def main():
counter = Counter()
# Run 100 increments concurrently
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(counter.value) # 100 (without lock, could be less)
asyncio.run(main())Periodic Tasks
import asyncio
async def periodic_task(interval, callback):
while True:
await callback()
await asyncio.sleep(interval)
async def check_health():
print("Health check running...")
async def main():
# Create periodic task
task = asyncio.create_task(
periodic_task(5.0, check_health)
)
# Let it run for a while
await asyncio.sleep(12)
task.cancel()
asyncio.run(main())First Completed Pattern
import asyncio
async def fetch_from_server(server, delay):
await asyncio.sleep(delay)
return f"Response from {server}"
async def main():
tasks = {
asyncio.create_task(fetch_from_server("server1", 1.0)),
asyncio.create_task(fetch_from_server("server2", 0.5)),
asyncio.create_task(fetch_from_server("server3", 2.0)),
}
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Get first result
winner = done.pop()
print(f"First: {winner.result()}")
# Cancel the rest
for task in pending:
task.cancel()
asyncio.run(main())Async Context Managers
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("Connecting...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Disconnecting...")
await asyncio.sleep(0.1)
async def query(self, sql):
await asyncio.sleep(0.1)
return [{"id": 1}]
async def main():
async with AsyncDatabase() as db:
results = await db.query("SELECT * FROM users")
print(results)
asyncio.run(main())Running Sync Code in Threads
import asyncio
import time
def blocking_io():
time.sleep(1) # Can't await this
return "result"
async def main():
loop = asyncio.get_running_loop()
# Run in thread pool
result = await loop.run_in_executor(None, blocking_io)
print(result)
asyncio.run(main())These patterns cover 90% of asyncio use cases. Master them and concurrent Python becomes straightforward.
React to this post: