I've spent an embarrassing number of hours debugging async Python code. Not the fun kind of debugging where you learn elegant solutions—the frustrating kind where the bug is something you did three hours ago and forgot about.
This post is the guide I wish I'd had. The bugs that bit me hardest, the tools that actually helped, and the patterns I now use to avoid repeating my mistakes.
The Bugs That Got Me
The Forgotten Await
This one still gets me. You call an async function, forget the await, and nothing happens. Or worse—something happens, but not what you expected.
async def fetch_user(user_id):
await asyncio.sleep(0.1) # Simulates network call
return {"id": user_id, "name": "Owen"}
async def main():
user = fetch_user(123) # Missing await!
print(f"Got user: {user}")
# Output: Got user: <coroutine object fetch_user at 0x...>The coroutine object prints fine. Your code keeps running. But you've got a coroutine, not a user. And somewhere down the line, you'll wonder why user["name"] throws a TypeError.
The really insidious version is when you forget await in a function that doesn't use the return value:
async def log_event(event):
await db.insert(event)
async def handle_request(request):
log_event({"type": "request", "data": request}) # Oops
# The event is never logged. No error. No warning.
return process(request)Python 3.11+ helps here—you get a warning about unawaited coroutines. But the warning goes to stderr, and in production logs, stderr gets noisy. The bug can hide for weeks.
How I catch it now:
Run with PYTHONASYNCIODEBUG=1 during development. It makes unawaited coroutine warnings fatal-ish. Or use a linter—Ruff catches await issues if you enable the right rules.
The Blocking Call
This is the silent killer of async performance. You write async code, everything's concurrent, you feel good. Then someone adds a requests.get() or a time.sleep() and your entire event loop freezes.
import requests # Sync library!
async def fetch_external_data():
# This blocks the entire event loop for 2 seconds
response = requests.get("https://slow-api.example.com")
return response.json()
async def main():
# These run sequentially, not concurrently
results = await asyncio.gather(
fetch_external_data(),
fetch_external_data(),
fetch_external_data(),
)
# Takes 6+ seconds, not 2 secondsThe code looks async. The function is async. But requests.get() is synchronous. While it's waiting for the network, no other coroutine can run. Your "concurrent" gather runs sequentially.
How I catch it now:
I use asyncio's debug mode to detect slow callbacks:
import asyncio
asyncio.run(main(), debug=True)When debug mode is on, any callback that blocks for more than 100ms (configurable) gets a warning:
Executing <Task ...> took 2.003 seconds
The other approach: grep your async codebase for known blocking calls. time.sleep, requests., open() without async. It's crude but catches obvious issues.
For legitimate blocking code you can't avoid (like CPU-heavy work), use an executor:
import asyncio
import concurrent.futures
async def process_image(image_data):
loop = asyncio.get_event_loop()
# Run blocking code in thread pool
result = await loop.run_in_executor(
None, # Default ThreadPoolExecutor
heavy_image_processing,
image_data
)
return resultRace Conditions
Async code runs concurrently. Concurrent code has race conditions. I keep learning this lesson.
balance = 100
async def withdraw(amount):
global balance
if balance >= amount:
await asyncio.sleep(0.01) # Simulates DB call
balance -= amount
return True
return False
async def main():
# Both check balance (100), both pass, both withdraw
results = await asyncio.gather(
withdraw(80),
withdraw(80),
)
print(f"Results: {results}") # [True, True]
print(f"Balance: {balance}") # -60 (!)Both withdrawals check the balance before either modifies it. Both see 100. Both succeed. You're now at -60.
The fix is obvious in hindsight: use a lock.
balance_lock = asyncio.Lock()
async def withdraw(amount):
global balance
async with balance_lock:
if balance >= amount:
await asyncio.sleep(0.01)
balance -= amount
return True
return FalseBut races are rarely this obvious in real code. They hide in the interaction between async functions. They depend on timing, so they work locally and fail in production. They're intermittent, so you can't reproduce them reliably.
How I catch them now:
Mostly by being paranoid. Any time I'm reading and writing shared state across an await, I ask: could another coroutine modify this between the read and write? If yes, lock.
Also: make state immutable where possible. Pass data instead of sharing it. The race condition above disappears if withdraw takes a balance and returns a new one instead of mutating a global.
Tools That Actually Help
asyncio Debug Mode
I mentioned this earlier, but it deserves its own section. Run with debug=True:
asyncio.run(main(), debug=True)Or set the environment variable:
PYTHONASYNCIODEBUG=1 python my_script.pyWhat you get:
- Warnings for unawaited coroutines
- Warnings for slow callbacks (>100ms by default)
- Better tracebacks when something fails
The tracebacks are the underrated feature. Without debug mode, async tracebacks are a maze. With it, they're still a maze, but with more signposts.
Logging with Context
Standard logging doesn't know about tasks. You get interleaved output from concurrent tasks with no indication which task logged what.
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def worker(name):
logging.info(f"Starting work")
await asyncio.sleep(0.1)
logging.info(f"Finished work")
async def main():
await asyncio.gather(worker("A"), worker("B"), worker("C"))Output:
INFO:root:Starting work
INFO:root:Starting work
INFO:root:Starting work
INFO:root:Finished work
INFO:root:Finished work
INFO:root:Finished work
Which "Starting work" is which? No idea.
Fix it by including task identity:
import asyncio
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(task)s] %(message)s"
)
class TaskFilter(logging.Filter):
def filter(self, record):
try:
task = asyncio.current_task()
record.task = task.get_name() if task else "main"
except RuntimeError:
record.task = "no-loop"
return True
logger = logging.getLogger()
logger.addFilter(TaskFilter())
async def worker(name):
asyncio.current_task().set_name(name)
logging.info("Starting work")
await asyncio.sleep(0.1)
logging.info("Finished work")Now each log line includes the task name. You can trace what's happening.
Python 3.11+ Tracebacks
If you're on Python 3.11 or later, you get much better async tracebacks out of the box. Earlier versions lost context at await boundaries—you'd see where the exception was raised but not the full chain of calls that got you there.
3.11 added "exception groups" and better traceback handling. TaskGroup in particular gives you grouped exceptions when multiple tasks fail:
async def failing_task(n):
await asyncio.sleep(0.1)
raise ValueError(f"Task {n} failed")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1))
tg.create_task(failing_task(2))
tg.create_task(failing_task(3))You get all three exceptions in one traceback, not just the first. This is genuinely useful for debugging concurrent failures.
Quick Debugging Print Pattern
When I need to quickly understand task execution order:
import asyncio
from datetime import datetime
def debug_log(msg):
task = asyncio.current_task()
task_name = task.get_name() if task else "main"
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] [{task_name}] {msg}")
async def my_function():
debug_log("entering")
await some_operation()
debug_log("after some_operation")
result = await another_operation()
debug_log(f"got result: {result}")
return resultNot elegant. But when you're chasing a timing bug at 11 PM, simple wins.
Patterns That Save Me
Structured Concurrency (TaskGroup)
Before Python 3.11, I used gather for everything. Tasks would escape their scope. Exceptions would get lost. Cancellation was a mess.
TaskGroup fixes this:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_orders(1))
task3 = tg.create_task(fetch_recommendations())
# All tasks complete (or fail) before we get here
# If any task fails, all others are cancelled
# All exceptions are raised togetherThe key insight: tasks are scoped to the context manager. When you exit the block, all tasks are done. No zombie tasks. No forgotten coroutines. No "where did that exception go?"
I now use TaskGroup by default and only drop to gather when I need specific behavior (like return_exceptions=True).
Proper Cancellation Handling
Cancellation is part of normal async flow, not an error. Your code should handle it gracefully.
async def resilient_worker():
try:
while True:
await do_work()
except asyncio.CancelledError:
# Cleanup! This is your chance to close connections, flush buffers, etc.
await cleanup()
raise # Re-raise to complete the cancellationThe crucial part: re-raise CancelledError. If you swallow it, the task isn't actually cancelled. It keeps running. This breaks TaskGroup, timeouts, and anything else relying on cancellation.
# DON'T DO THIS
async def bad_worker():
try:
await some_operation()
except asyncio.CancelledError:
print("Cancelled, but I'll keep going!") # Bad!
# Swallowing the error breaks cancellation
# DO THIS
async def good_worker():
try:
await some_operation()
except asyncio.CancelledError:
print("Cancelled, cleaning up")
await cleanup()
raise # Always re-raise!Timeouts Everywhere
Network calls fail. External services hang. Without timeouts, your async code waits forever.
async def fetch_with_timeout(url, timeout=30):
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch(url)
except TimeoutError:
logging.warning(f"Timeout fetching {url}")
return NoneFor pre-3.11:
async def fetch_with_timeout(url, timeout=30):
try:
return await asyncio.wait_for(fetch(url), timeout=timeout)
except asyncio.TimeoutError:
logging.warning(f"Timeout fetching {url}")
return NoneI set timeouts on anything external. API calls, database queries, file operations. The timeout value is always explicit, never infinite.
Shielding Critical Operations
Sometimes you have an operation that must complete even if the parent task is cancelled. Use shield:
async def save_critical_data(data):
# This write should complete even if we're cancelled
await asyncio.shield(database.write(data))Use this sparingly. If everything is shielded, cancellation stops working. But for critical operations—closing connections gracefully, flushing audit logs, completing a financial transaction—it's essential.
What I Wish I'd Learned Sooner
Async debugging is harder than sync debugging. There's no getting around it. Concurrent execution means more state, more timing dependencies, more ways for things to go wrong.
But the tools and patterns help. Debug mode catches the obvious mistakes. Logging with task context makes interleaved output readable. TaskGroup keeps concurrency bounded and failures visible.
The biggest lesson: be paranoid about shared state. Every await is a point where other code can run. If you're holding state across an await, ask whether another coroutine could interfere. Usually the answer is yes, and you need a lock.
Async Python is powerful. It's also a different mindset than synchronous code. The bugs are different, the debugging is different, and the patterns are different. Accept that, and the hard-won lessons become fewer and further between.
At least, that's been my experience. 600 hours of async debugging later, I mostly know what to look for. Mostly.