The first time I saw a with statement, I thought it was just a cleaner way to handle files. It is—but it's so much more. Context managers are one of Python's most elegant patterns for resource management. Here's everything I've learned about them.
What Problem Do Context Managers Solve?
Before context managers, handling resources looked like this:
file = open("data.txt")
try:
data = file.read()
process(data)
finally:
file.close()Forget the finally? You've got a resource leak. And this pattern repeats everywhere—files, locks, database connections, network sockets. It's tedious and error-prone.
Context managers encapsulate this setup/teardown pattern:
with open("data.txt") as file:
data = file.read()
process(data)
# File is automatically closed, even if an exception occursThe with statement guarantees cleanup. It's not just shorter code—it's safer code.
The Protocol: __enter__ and __exit__
Any object with __enter__ and __exit__ methods is a context manager. Here's how it works:
class ManagedResource:
def __enter__(self):
print("Acquiring resource")
return self # This becomes the 'as' variable
def __exit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
return False # Don't suppress exceptions
with ManagedResource() as resource:
print("Using resource")
# Output:
# Acquiring resource
# Using resource
# Releasing resourceHow the Protocol Works
When you write with X as Y:
- Python calls
X.__enter__()and assigns the result toY - The body of the
withblock runs - Python calls
X.__exit__(exc_type, exc_val, exc_tb), passing exception info (orNoneif no exception)
The key insight: __exit__ always runs, even if the body raises an exception. This is the guarantee that makes context managers so powerful.
A Complete Example
class Timer:
def __enter__(self):
import time
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
import time
self.elapsed = time.perf_counter() - self.start
print(f"Elapsed: {self.elapsed:.3f}s")
return False
with Timer() as t:
# Do some work
sum(range(1_000_000))
# Output: Elapsed: 0.032sException Handling in __exit__
The three arguments to __exit__ tell you about any exception that occurred:
class ExceptionDemo:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
print("No exception")
else:
print(f"Exception: {exc_type.__name__}: {exc_val}")
return False # Let the exception propagate
# No exception
with ExceptionDemo():
pass
# Output: No exception
# With exception
with ExceptionDemo():
raise ValueError("oops")
# Output: Exception: ValueError: oops
# Then the exception propagatesSuppressing Exceptions
If __exit__ returns True, the exception is suppressed:
class Suppressor:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is ValueError:
print(f"Suppressing: {exc_val}")
return True # Suppress ValueError
return False # Let other exceptions propagate
with Suppressor():
raise ValueError("this is fine")
print("Continues normally")
# Output:
# Suppressing: this is fine
# Continues normallyThis is useful for optional cleanup or handling expected errors. But use it sparingly—silently swallowing exceptions can hide bugs.
The contextlib Module
Writing classes for every context manager gets tedious. The contextlib module provides shortcuts.
@contextmanager: The Generator Shortcut
The most useful tool is @contextmanager. It lets you write context managers as generators:
from contextlib import contextmanager
@contextmanager
def timer():
import time
start = time.perf_counter()
yield # This is where the 'with' body runs
elapsed = time.perf_counter() - start
print(f"Elapsed: {elapsed:.3f}s")
with timer():
sum(range(1_000_000))The pattern is simple:
- Setup code before
yield yieldthe value (or nothing) for theasvariable- Cleanup code after
yield
Yielding a Value
@contextmanager
def open_file(path, mode="r"):
file = open(path, mode)
try:
yield file # This becomes the 'as' variable
finally:
file.close()
with open_file("data.txt") as f:
content = f.read()Always use try/finally in @contextmanager! Without it, exceptions in the body skip your cleanup code:
# BAD - no cleanup on exception
@contextmanager
def bad_resource():
resource = acquire()
yield resource
release(resource) # Skipped if exception in body!
# GOOD - cleanup guaranteed
@contextmanager
def good_resource():
resource = acquire()
try:
yield resource
finally:
release(resource) # Always runsclosing: For Objects with close()
If an object has a close() method but isn't a context manager:
from contextlib import closing
from urllib.request import urlopen
with closing(urlopen("https://example.com")) as page:
html = page.read()
# page.close() called automaticallysuppress: Ignore Specific Exceptions
from contextlib import suppress
# Instead of:
try:
os.remove("file.txt")
except FileNotFoundError:
pass
# Use:
with suppress(FileNotFoundError):
os.remove("file.txt")Cleaner for optional operations where you don't care if they fail.
redirect_stdout and redirect_stderr
Capture or redirect output:
from contextlib import redirect_stdout
from io import StringIO
# Capture stdout
buffer = StringIO()
with redirect_stdout(buffer):
print("This goes to buffer")
captured = buffer.getvalue() # "This goes to buffer\n"
# Redirect to file
with open("log.txt", "w") as f:
with redirect_stdout(f):
print("This goes to file")nullcontext: A No-Op Context Manager
Useful when you conditionally need a context manager:
from contextlib import nullcontext
def process(path, use_lock=True):
lock = threading.Lock() if use_lock else None
with lock if lock else nullcontext():
do_work(path)Or with Python 3.10+ pattern matching. I often use this for optional features:
@contextmanager
def maybe_timer(enabled=True):
if enabled:
import time
start = time.perf_counter()
yield
print(f"Elapsed: {time.perf_counter() - start:.3f}s")
else:
yield
# Or just use nullcontext
timer_ctx = timer() if debug else nullcontext()
with timer_ctx:
do_work()Async Context Managers
Async code has its own context manager protocol: __aenter__ and __aexit__.
The Async Protocol
class AsyncResource:
async def __aenter__(self):
print("Acquiring async resource")
await asyncio.sleep(0.1) # Simulate async setup
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing async resource")
await asyncio.sleep(0.1) # Simulate async cleanup
return False
async def main():
async with AsyncResource() as resource:
print("Using resource")
await asyncio.sleep(0.1)
asyncio.run(main())@asynccontextmanager
Just like @contextmanager, but for async:
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer():
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"Elapsed: {elapsed:.3f}s")
async def main():
async with async_timer():
await asyncio.sleep(1)Async Database Connection Pool
A realistic example:
@asynccontextmanager
async def get_connection(pool):
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
async def main():
pool = await create_pool()
async with get_connection(pool) as conn:
result = await conn.fetch("SELECT * FROM users")aclosing: Async Version of closing
from contextlib import aclosing
async with aclosing(async_generator()) as gen:
async for item in gen:
process(item)
# Generator properly closedPractical Examples
Here's where context managers really shine—real-world resource management.
File Handling (The Classic)
# The built-in open() is a context manager
with open("data.txt", "w") as f:
f.write("Hello, World!")
# File closed automatically, even on exception
# Custom atomic write
@contextmanager
def atomic_write(path):
"""Write to temp file, rename on success."""
import os
import tempfile
dir_name = os.path.dirname(path) or "."
fd, temp_path = tempfile.mkstemp(dir=dir_name)
try:
with os.fdopen(fd, "w") as f:
yield f
os.replace(temp_path, path) # Atomic on POSIX
except:
os.unlink(temp_path) # Clean up temp file
raise
with atomic_write("config.json") as f:
json.dump(config, f)
# Either fully written or original untouchedLocks and Synchronization
import threading
lock = threading.Lock()
# Manual way (don't do this)
lock.acquire()
try:
shared_resource.update()
finally:
lock.release()
# Context manager way (do this)
with lock:
shared_resource.update()Custom lock with timeout:
@contextmanager
def acquire_timeout(lock, timeout):
acquired = lock.acquire(timeout=timeout)
if not acquired:
raise TimeoutError("Could not acquire lock")
try:
yield
finally:
lock.release()
with acquire_timeout(lock, timeout=5.0):
critical_section()Database Connections
@contextmanager
def get_db_connection(config):
import psycopg2
conn = psycopg2.connect(**config)
try:
yield conn
conn.commit() # Commit on success
except:
conn.rollback() # Rollback on error
raise
finally:
conn.close()
with get_db_connection(DB_CONFIG) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
# Committed if successful, rolled back if notDatabase Transactions
@contextmanager
def transaction(conn):
"""Wrap operations in a transaction."""
try:
yield conn
conn.commit()
except:
conn.rollback()
raise
with get_db_connection(config) as conn:
with transaction(conn):
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Both updates committed together, or neitherTiming and Profiling
@contextmanager
def timer(name=""):
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
label = f" [{name}]" if name else ""
print(f"Elapsed{label}: {elapsed:.4f}s")
with timer("data processing"):
process_large_dataset()
# Output: Elapsed [data processing]: 2.3456sReusable timer class:
class Timer:
def __init__(self, name=""):
self.name = name
self.elapsed = 0
def __enter__(self):
import time
self.start = time.perf_counter()
return self
def __exit__(self, *args):
import time
self.elapsed = time.perf_counter() - self.start
return False
with Timer("query") as t:
run_query()
print(f"Query took {t.elapsed:.3f}s")Temporary State Changes
@contextmanager
def temp_chdir(path):
"""Temporarily change working directory."""
import os
old_dir = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_dir)
with temp_chdir("/tmp"):
# Working in /tmp
process_temp_files()
# Back to original directory@contextmanager
def temp_env(**env_vars):
"""Temporarily set environment variables."""
import os
old_values = {}
for key, value in env_vars.items():
old_values[key] = os.environ.get(key)
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
try:
yield
finally:
for key, old_value in old_values.items():
if old_value is None:
os.environ.pop(key, None)
else:
os.environ[key] = old_value
with temp_env(DEBUG="1", API_KEY="test_key"):
run_tests()
# Environment restoredMocking and Testing
@contextmanager
def mock_time(frozen_time):
"""Freeze time for testing."""
import time
original = time.time
time.time = lambda: frozen_time
try:
yield
finally:
time.time = original
with mock_time(1609459200): # 2021-01-01 00:00:00
assert time.time() == 1609459200Nested Context Managers
Multiple context managers can be nested.
Multiple with Statements
with open("input.txt") as infile:
with open("output.txt", "w") as outfile:
outfile.write(infile.read().upper())Single Line (Python 3.1+)
with open("input.txt") as infile, open("output.txt", "w") as outfile:
outfile.write(infile.read().upper())Parenthesized (Python 3.10+)
For better formatting:
with (
open("input.txt") as infile,
open("output.txt", "w") as outfile,
Timer("processing") as t,
):
outfile.write(infile.read().upper())Order Matters
Context managers are entered in order and exited in reverse order (like a stack):
@contextmanager
def announce(name):
print(f"Entering {name}")
yield
print(f"Exiting {name}")
with announce("A"), announce("B"), announce("C"):
print("In body")
# Output:
# Entering A
# Entering B
# Entering C
# In body
# Exiting C
# Exiting B
# Exiting AExitStack: Dynamic Context Management
What if you don't know how many context managers you need until runtime? ExitStack handles this.
Basic Usage
from contextlib import ExitStack
with ExitStack() as stack:
files = [stack.enter_context(open(f)) for f in filenames]
# All files open
for f in files:
process(f)
# All files closedWhy ExitStack?
Without ExitStack, opening a dynamic number of files is awkward:
# Without ExitStack - ugly and error-prone
files = []
try:
for filename in filenames:
files.append(open(filename))
# Process files
finally:
for f in files:
f.close() # What if one close() fails?ExitStack handles cleanup correctly even if individual close operations fail.
Callbacks
Register arbitrary cleanup functions:
with ExitStack() as stack:
resource = acquire_resource()
stack.callback(release_resource, resource)
# More setup
stack.callback(lambda: print("Cleanup 1"))
stack.callback(lambda: print("Cleanup 2"))
# Use resources
# Output:
# Cleanup 2
# Cleanup 1
# (resource released)push: Conditional Cleanup
with ExitStack() as stack:
conn = connect()
stack.push(conn) # Will call conn.__exit__
if needs_transaction:
trans = conn.begin()
stack.push(trans) # Will call trans.__exit__
# Work with conn (and maybe trans)pop_all: Transfer Ownership
Sometimes you want to start with managed cleanup, then take over:
def acquire_resources():
with ExitStack() as stack:
resource1 = stack.enter_context(Resource1())
resource2 = stack.enter_context(Resource2())
# All good, transfer ownership to caller
return stack.pop_all(), resource1, resource2
# Caller now responsible for cleanup
exit_stack, r1, r2 = acquire_resources()
try:
use_resources(r1, r2)
finally:
exit_stack.close()AsyncExitStack
For async context managers:
from contextlib import AsyncExitStack
async def main():
async with AsyncExitStack() as stack:
connections = [
await stack.enter_async_context(get_connection(host))
for host in hosts
]
# All connections open
await process_all(connections)
# All connections closedReal-World Example: Resource Pool
@contextmanager
def resource_pool(configs):
"""Open multiple resources, close all on exit."""
with ExitStack() as stack:
resources = []
for config in configs:
resource = stack.enter_context(Resource(config))
resources.append(resource)
yield resources
with resource_pool(configs) as resources:
for r in resources:
r.process()Example: Multi-File Processing
def process_files(input_paths, output_path):
with ExitStack() as stack:
# Open all input files
inputs = [stack.enter_context(open(p)) for p in input_paths]
# Open output file
output = stack.enter_context(open(output_path, "w"))
# Merge inputs to output
for infile in inputs:
output.write(infile.read())Patterns and Best Practices
Pattern 1: Setup/Teardown
The classic pattern—acquire then release:
@contextmanager
def managed_resource():
resource = acquire()
try:
yield resource
finally:
release(resource)Pattern 2: Exception Translation
Convert low-level exceptions to domain exceptions:
@contextmanager
def database_operation():
try:
yield
except psycopg2.IntegrityError as e:
raise DuplicateKeyError(str(e)) from e
except psycopg2.OperationalError as e:
raise DatabaseUnavailableError(str(e)) from ePattern 3: Logging and Tracing
@contextmanager
def trace(operation_name):
import logging
import uuid
trace_id = str(uuid.uuid4())[:8]
logger = logging.getLogger(__name__)
logger.info(f"[{trace_id}] Starting: {operation_name}")
try:
yield trace_id
except Exception as e:
logger.error(f"[{trace_id}] Failed: {operation_name} - {e}")
raise
else:
logger.info(f"[{trace_id}] Completed: {operation_name}")
with trace("user_signup") as trace_id:
create_user(data)Pattern 4: Optional Context Manager
@contextmanager
def optional_lock(lock, use_lock=True):
if use_lock:
with lock:
yield
else:
yield
# Or simpler with nullcontext:
lock_ctx = lock if use_lock else nullcontext()
with lock_ctx:
do_work()Pattern 5: Reentrant Context Manager
Some contexts can be entered multiple times:
class ReentrantTimer:
def __init__(self):
self.depth = 0
self.elapsed = 0
def __enter__(self):
if self.depth == 0:
import time
self.start = time.perf_counter()
self.depth += 1
return self
def __exit__(self, *args):
self.depth -= 1
if self.depth == 0:
import time
self.elapsed = time.perf_counter() - self.start
return False
timer = ReentrantTimer()
with timer:
do_something()
with timer: # Nested entry
do_more()
# Still timing
# Now elapsed is setCommon Gotchas
Gotcha 1: Forgetting try/finally in @contextmanager
# WRONG - cleanup skipped on exception
@contextmanager
def bad():
resource = acquire()
yield resource
release(resource) # Never runs if exception!
# RIGHT
@contextmanager
def good():
resource = acquire()
try:
yield resource
finally:
release(resource)Gotcha 2: Generator Doesn't Yield
@contextmanager
def broken():
if some_condition:
return # Error! Must yield exactly once
yield
# Fix:
@contextmanager
def fixed():
if some_condition:
yield None
return
yield somethingGotcha 3: Yielding in a Loop
# WRONG - context manager, not generator
@contextmanager
def wrong():
for item in items:
yield item # Error!
# This is a generator, use it as such:
def right():
for item in items:
yield itemGotcha 4: Suppressing Exceptions Unexpectedly
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return self.suppress_errors # Be explicit!Gotcha 5: Not Handling __exit__ Exceptions
If __exit__ raises, the original exception is lost:
class Risky:
def __exit__(self, *args):
raise RuntimeError("Cleanup failed") # Original exception lost!
# Better: log and re-raise original
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.cleanup()
except Exception:
if exc_type is None:
raise # No original exception, ok to raise
logging.exception("Cleanup failed") # Log, keep original
return FalseWhen to Use Context Managers
Use context managers for:
- Resources that need cleanup (files, connections, locks)
- Temporary state changes (directory, environment, config)
- Setup/teardown patterns (timing, tracing, mocking)
- Transaction boundaries (commit/rollback)
- Scoped behavior (locking, suppressing warnings)
Consider alternatives when:
- No cleanup is needed
- The "context" spans multiple functions (use dependency injection)
- You need to yield multiple values (use generators)
Summary
Context managers are Python's answer to the resource management problem. The key points:
- The protocol:
__enter__sets up,__exit__tears down (always!) - @contextmanager: Write context managers as generators
- contextlib:
closing,suppress,redirect_stdout,nullcontext - Async support:
async with,@asynccontextmanager - ExitStack: Manage dynamic numbers of contexts
- Always use try/finally in
@contextmanager
Start simple with the with statement for files and locks. As you need more control, the full protocol and contextlib tools are there. Context managers make resource management both elegant and safe.