asyncio enables concurrent I/O without threads. Here's how to use it effectively.
Basic Coroutines
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# Run the coroutine
asyncio.run(hello())Multiple Coroutines
import asyncio
async def fetch(name, delay):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} done")
return f"{name} result"
async def main():
# Run concurrently
results = await asyncio.gather(
fetch("A", 2),
fetch("B", 1),
fetch("C", 3),
)
print(results) # ['A result', 'B result', 'C result']
asyncio.run(main())Tasks
import asyncio
async def background_task():
while True:
print("Working...")
await asyncio.sleep(1)
async def main():
# Create task (starts immediately)
task = asyncio.create_task(background_task())
# Do other work
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancelled")
asyncio.run(main())Gather with Error Handling
import asyncio
async def might_fail(n):
if n == 2:
raise ValueError("Error on 2")
await asyncio.sleep(0.1)
return n
async def main():
# return_exceptions=True to get exceptions as results
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
asyncio.run(main())Timeouts
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())TaskGroup (Python 3.11+)
import asyncio
async def fetch(url):
await asyncio.sleep(1)
return f"data from {url}"
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("url1"))
task2 = tg.create_task(fetch("url2"))
task3 = tg.create_task(fetch("url3"))
# All tasks complete here
print(task1.result(), task2.result(), task3.result())
asyncio.run(main())Semaphore (Limit Concurrency)
import asyncio
semaphore = asyncio.Semaphore(3)
async def limited_task(n):
async with semaphore:
print(f"Task {n} starting")
await asyncio.sleep(1)
print(f"Task {n} done")
async def main():
tasks = [limited_task(i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())Locks
import asyncio
lock = asyncio.Lock()
counter = 0
async def increment():
global counter
async with lock:
temp = counter
await asyncio.sleep(0.01)
counter = temp + 1
async def main():
await asyncio.gather(*[increment() for _ in range(100)])
print(counter) # 100
asyncio.run(main())Events
import asyncio
event = asyncio.Event()
async def waiter():
print("Waiting for event...")
await event.wait()
print("Event received!")
async def setter():
await asyncio.sleep(2)
print("Setting event")
event.set()
async def main():
await asyncio.gather(waiter(), setter())
asyncio.run(main())Queues
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"Produced {i}")
await asyncio.sleep(0.1)
await queue.put(None) # Sentinel
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())Running in Executor (Blocking Code)
import asyncio
import time
def blocking_io():
time.sleep(1) # Simulates blocking I/O
return "result"
def cpu_bound():
# Heavy computation
return sum(i * i for i in range(10_000_000))
async def main():
loop = asyncio.get_event_loop()
# Run blocking I/O in thread pool
result = await loop.run_in_executor(None, blocking_io)
print(result)
# Run CPU-bound in process pool
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print(result)
asyncio.run(main())Async Context Managers
import asyncio
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def do_work(self):
print("Working")
async def main():
async with AsyncResource() as resource:
await resource.do_work()
asyncio.run(main())Async Iterators
import asyncio
class AsyncRange:
def __init__(self, stop):
self.stop = stop
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1)
value = self.current
self.current += 1
return value
async def main():
async for i in AsyncRange(5):
print(i)
asyncio.run(main())Async Generators
import asyncio
async def async_range(stop):
for i in range(stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for i in async_range(5):
print(i)
asyncio.run(main())HTTP Example (aiohttp)
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
urls = [
"https://example.com",
"https://httpbin.org/get",
]
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"{url}: {len(result)} bytes")
asyncio.run(main())Common Patterns
import asyncio
# Retry pattern
async def retry(coro_func, retries=3, delay=1):
for attempt in range(retries):
try:
return await coro_func()
except Exception as e:
if attempt == retries - 1:
raise
await asyncio.sleep(delay)
# Batch processing
async def process_batch(items, batch_size=10):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
await asyncio.gather(*[process(item) for item in batch])
# Rate limiting
class RateLimiter:
def __init__(self, rate):
self.rate = rate
self.semaphore = asyncio.Semaphore(rate)
async def acquire(self):
await self.semaphore.acquire()
asyncio.create_task(self._release_later())
async def _release_later(self):
await asyncio.sleep(1)
self.semaphore.release()Best Practices
# Use asyncio.run() as entry point
asyncio.run(main())
# Prefer gather for concurrent tasks
await asyncio.gather(task1(), task2())
# Use TaskGroup (3.11+) for better error handling
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
# Set timeouts for external calls
await asyncio.wait_for(external_call(), timeout=30)
# Use semaphore to limit concurrency
async with semaphore:
await make_request()
# Run blocking code in executor
await loop.run_in_executor(None, blocking_func)asyncio excels at I/O-bound concurrent code. Use it for network requests, file I/O, and any task that spends time waiting.
React to this post: