The first time I saw a with statement, I thought it was just a cleaner way to handle files. It is—but it's so much more. Context managers are one of Python's most elegant patterns for resource management. Here's everything I've learned about them.

What Problem Do Context Managers Solve?

Before context managers, handling resources looked like this:

file = open("data.txt")
try:
    data = file.read()
    process(data)
finally:
    file.close()

Forget the finally? You've got a resource leak. And this pattern repeats everywhere—files, locks, database connections, network sockets. It's tedious and error-prone.

Context managers encapsulate this setup/teardown pattern:

with open("data.txt") as file:
    data = file.read()
    process(data)
# File is automatically closed, even if an exception occurs

The with statement guarantees cleanup. It's not just shorter code—it's safer code.

The Protocol: __enter__ and __exit__

Any object with __enter__ and __exit__ methods is a context manager. Here's how it works:

class ManagedResource:
    def __enter__(self):
        print("Acquiring resource")
        return self  # This becomes the 'as' variable
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource")
        return False  # Don't suppress exceptions
 
with ManagedResource() as resource:
    print("Using resource")
    
# Output:
# Acquiring resource
# Using resource
# Releasing resource

How the Protocol Works

When you write with X as Y:

  1. Python calls X.__enter__() and assigns the result to Y
  2. The body of the with block runs
  3. Python calls X.__exit__(exc_type, exc_val, exc_tb), passing exception info (or None if no exception)

The key insight: __exit__ always runs, even if the body raises an exception. This is the guarantee that makes context managers so powerful.

A Complete Example

class Timer:
    def __enter__(self):
        import time
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        import time
        self.elapsed = time.perf_counter() - self.start
        print(f"Elapsed: {self.elapsed:.3f}s")
        return False
 
with Timer() as t:
    # Do some work
    sum(range(1_000_000))
    
# Output: Elapsed: 0.032s

Exception Handling in __exit__

The three arguments to __exit__ tell you about any exception that occurred:

class ExceptionDemo:
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            print("No exception")
        else:
            print(f"Exception: {exc_type.__name__}: {exc_val}")
        return False  # Let the exception propagate
 
# No exception
with ExceptionDemo():
    pass
# Output: No exception
 
# With exception
with ExceptionDemo():
    raise ValueError("oops")
# Output: Exception: ValueError: oops
# Then the exception propagates

Suppressing Exceptions

If __exit__ returns True, the exception is suppressed:

class Suppressor:
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is ValueError:
            print(f"Suppressing: {exc_val}")
            return True  # Suppress ValueError
        return False  # Let other exceptions propagate
 
with Suppressor():
    raise ValueError("this is fine")
print("Continues normally")
 
# Output:
# Suppressing: this is fine
# Continues normally

This is useful for optional cleanup or handling expected errors. But use it sparingly—silently swallowing exceptions can hide bugs.

The contextlib Module

Writing classes for every context manager gets tedious. The contextlib module provides shortcuts.

@contextmanager: The Generator Shortcut

The most useful tool is @contextmanager. It lets you write context managers as generators:

from contextlib import contextmanager
 
@contextmanager
def timer():
    import time
    start = time.perf_counter()
    yield  # This is where the 'with' body runs
    elapsed = time.perf_counter() - start
    print(f"Elapsed: {elapsed:.3f}s")
 
with timer():
    sum(range(1_000_000))

The pattern is simple:

  1. Setup code before yield
  2. yield the value (or nothing) for the as variable
  3. Cleanup code after yield

Yielding a Value

@contextmanager
def open_file(path, mode="r"):
    file = open(path, mode)
    try:
        yield file  # This becomes the 'as' variable
    finally:
        file.close()
 
with open_file("data.txt") as f:
    content = f.read()

Always use try/finally in @contextmanager! Without it, exceptions in the body skip your cleanup code:

# BAD - no cleanup on exception
@contextmanager
def bad_resource():
    resource = acquire()
    yield resource
    release(resource)  # Skipped if exception in body!
 
# GOOD - cleanup guaranteed
@contextmanager
def good_resource():
    resource = acquire()
    try:
        yield resource
    finally:
        release(resource)  # Always runs

closing: For Objects with close()

If an object has a close() method but isn't a context manager:

from contextlib import closing
from urllib.request import urlopen
 
with closing(urlopen("https://example.com")) as page:
    html = page.read()
# page.close() called automatically

suppress: Ignore Specific Exceptions

from contextlib import suppress
 
# Instead of:
try:
    os.remove("file.txt")
except FileNotFoundError:
    pass
 
# Use:
with suppress(FileNotFoundError):
    os.remove("file.txt")

Cleaner for optional operations where you don't care if they fail.

redirect_stdout and redirect_stderr

Capture or redirect output:

from contextlib import redirect_stdout
from io import StringIO
 
# Capture stdout
buffer = StringIO()
with redirect_stdout(buffer):
    print("This goes to buffer")
    
captured = buffer.getvalue()  # "This goes to buffer\n"
 
# Redirect to file
with open("log.txt", "w") as f:
    with redirect_stdout(f):
        print("This goes to file")

nullcontext: A No-Op Context Manager

Useful when you conditionally need a context manager:

from contextlib import nullcontext
 
def process(path, use_lock=True):
    lock = threading.Lock() if use_lock else None
    
    with lock if lock else nullcontext():
        do_work(path)

Or with Python 3.10+ pattern matching. I often use this for optional features:

@contextmanager
def maybe_timer(enabled=True):
    if enabled:
        import time
        start = time.perf_counter()
        yield
        print(f"Elapsed: {time.perf_counter() - start:.3f}s")
    else:
        yield
 
# Or just use nullcontext
timer_ctx = timer() if debug else nullcontext()
with timer_ctx:
    do_work()

Async Context Managers

Async code has its own context manager protocol: __aenter__ and __aexit__.

The Async Protocol

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring async resource")
        await asyncio.sleep(0.1)  # Simulate async setup
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing async resource")
        await asyncio.sleep(0.1)  # Simulate async cleanup
        return False
 
async def main():
    async with AsyncResource() as resource:
        print("Using resource")
        await asyncio.sleep(0.1)
 
asyncio.run(main())

@asynccontextmanager

Just like @contextmanager, but for async:

from contextlib import asynccontextmanager
 
@asynccontextmanager
async def async_timer():
    import time
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"Elapsed: {elapsed:.3f}s")
 
async def main():
    async with async_timer():
        await asyncio.sleep(1)

Async Database Connection Pool

A realistic example:

@asynccontextmanager
async def get_connection(pool):
    conn = await pool.acquire()
    try:
        yield conn
    finally:
        await pool.release(conn)
 
async def main():
    pool = await create_pool()
    
    async with get_connection(pool) as conn:
        result = await conn.fetch("SELECT * FROM users")

aclosing: Async Version of closing

from contextlib import aclosing
 
async with aclosing(async_generator()) as gen:
    async for item in gen:
        process(item)
# Generator properly closed

Practical Examples

Here's where context managers really shine—real-world resource management.

File Handling (The Classic)

# The built-in open() is a context manager
with open("data.txt", "w") as f:
    f.write("Hello, World!")
# File closed automatically, even on exception
 
# Custom atomic write
@contextmanager
def atomic_write(path):
    """Write to temp file, rename on success."""
    import os
    import tempfile
    
    dir_name = os.path.dirname(path) or "."
    fd, temp_path = tempfile.mkstemp(dir=dir_name)
    
    try:
        with os.fdopen(fd, "w") as f:
            yield f
        os.replace(temp_path, path)  # Atomic on POSIX
    except:
        os.unlink(temp_path)  # Clean up temp file
        raise
 
with atomic_write("config.json") as f:
    json.dump(config, f)
# Either fully written or original untouched

Locks and Synchronization

import threading
 
lock = threading.Lock()
 
# Manual way (don't do this)
lock.acquire()
try:
    shared_resource.update()
finally:
    lock.release()
 
# Context manager way (do this)
with lock:
    shared_resource.update()

Custom lock with timeout:

@contextmanager
def acquire_timeout(lock, timeout):
    acquired = lock.acquire(timeout=timeout)
    if not acquired:
        raise TimeoutError("Could not acquire lock")
    try:
        yield
    finally:
        lock.release()
 
with acquire_timeout(lock, timeout=5.0):
    critical_section()

Database Connections

@contextmanager
def get_db_connection(config):
    import psycopg2
    
    conn = psycopg2.connect(**config)
    try:
        yield conn
        conn.commit()  # Commit on success
    except:
        conn.rollback()  # Rollback on error
        raise
    finally:
        conn.close()
 
with get_db_connection(DB_CONFIG) as conn:
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
# Committed if successful, rolled back if not

Database Transactions

@contextmanager
def transaction(conn):
    """Wrap operations in a transaction."""
    try:
        yield conn
        conn.commit()
    except:
        conn.rollback()
        raise
 
with get_db_connection(config) as conn:
    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
        cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    # Both updates committed together, or neither

Timing and Profiling

@contextmanager
def timer(name=""):
    import time
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        label = f" [{name}]" if name else ""
        print(f"Elapsed{label}: {elapsed:.4f}s")
 
with timer("data processing"):
    process_large_dataset()
# Output: Elapsed [data processing]: 2.3456s

Reusable timer class:

class Timer:
    def __init__(self, name=""):
        self.name = name
        self.elapsed = 0
    
    def __enter__(self):
        import time
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, *args):
        import time
        self.elapsed = time.perf_counter() - self.start
        return False
 
with Timer("query") as t:
    run_query()
print(f"Query took {t.elapsed:.3f}s")

Temporary State Changes

@contextmanager
def temp_chdir(path):
    """Temporarily change working directory."""
    import os
    old_dir = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(old_dir)
 
with temp_chdir("/tmp"):
    # Working in /tmp
    process_temp_files()
# Back to original directory
@contextmanager
def temp_env(**env_vars):
    """Temporarily set environment variables."""
    import os
    old_values = {}
    
    for key, value in env_vars.items():
        old_values[key] = os.environ.get(key)
        if value is None:
            os.environ.pop(key, None)
        else:
            os.environ[key] = value
    
    try:
        yield
    finally:
        for key, old_value in old_values.items():
            if old_value is None:
                os.environ.pop(key, None)
            else:
                os.environ[key] = old_value
 
with temp_env(DEBUG="1", API_KEY="test_key"):
    run_tests()
# Environment restored

Mocking and Testing

@contextmanager
def mock_time(frozen_time):
    """Freeze time for testing."""
    import time
    original = time.time
    time.time = lambda: frozen_time
    try:
        yield
    finally:
        time.time = original
 
with mock_time(1609459200):  # 2021-01-01 00:00:00
    assert time.time() == 1609459200

Nested Context Managers

Multiple context managers can be nested.

Multiple with Statements

with open("input.txt") as infile:
    with open("output.txt", "w") as outfile:
        outfile.write(infile.read().upper())

Single Line (Python 3.1+)

with open("input.txt") as infile, open("output.txt", "w") as outfile:
    outfile.write(infile.read().upper())

Parenthesized (Python 3.10+)

For better formatting:

with (
    open("input.txt") as infile,
    open("output.txt", "w") as outfile,
    Timer("processing") as t,
):
    outfile.write(infile.read().upper())

Order Matters

Context managers are entered in order and exited in reverse order (like a stack):

@contextmanager
def announce(name):
    print(f"Entering {name}")
    yield
    print(f"Exiting {name}")
 
with announce("A"), announce("B"), announce("C"):
    print("In body")
 
# Output:
# Entering A
# Entering B
# Entering C
# In body
# Exiting C
# Exiting B
# Exiting A

ExitStack: Dynamic Context Management

What if you don't know how many context managers you need until runtime? ExitStack handles this.

Basic Usage

from contextlib import ExitStack
 
with ExitStack() as stack:
    files = [stack.enter_context(open(f)) for f in filenames]
    # All files open
    for f in files:
        process(f)
# All files closed

Why ExitStack?

Without ExitStack, opening a dynamic number of files is awkward:

# Without ExitStack - ugly and error-prone
files = []
try:
    for filename in filenames:
        files.append(open(filename))
    # Process files
finally:
    for f in files:
        f.close()  # What if one close() fails?

ExitStack handles cleanup correctly even if individual close operations fail.

Callbacks

Register arbitrary cleanup functions:

with ExitStack() as stack:
    resource = acquire_resource()
    stack.callback(release_resource, resource)
    
    # More setup
    stack.callback(lambda: print("Cleanup 1"))
    stack.callback(lambda: print("Cleanup 2"))
    
    # Use resources
# Output:
# Cleanup 2
# Cleanup 1
# (resource released)

push: Conditional Cleanup

with ExitStack() as stack:
    conn = connect()
    stack.push(conn)  # Will call conn.__exit__
    
    if needs_transaction:
        trans = conn.begin()
        stack.push(trans)  # Will call trans.__exit__
    
    # Work with conn (and maybe trans)

pop_all: Transfer Ownership

Sometimes you want to start with managed cleanup, then take over:

def acquire_resources():
    with ExitStack() as stack:
        resource1 = stack.enter_context(Resource1())
        resource2 = stack.enter_context(Resource2())
        
        # All good, transfer ownership to caller
        return stack.pop_all(), resource1, resource2
 
# Caller now responsible for cleanup
exit_stack, r1, r2 = acquire_resources()
try:
    use_resources(r1, r2)
finally:
    exit_stack.close()

AsyncExitStack

For async context managers:

from contextlib import AsyncExitStack
 
async def main():
    async with AsyncExitStack() as stack:
        connections = [
            await stack.enter_async_context(get_connection(host))
            for host in hosts
        ]
        # All connections open
        await process_all(connections)
    # All connections closed

Real-World Example: Resource Pool

@contextmanager
def resource_pool(configs):
    """Open multiple resources, close all on exit."""
    with ExitStack() as stack:
        resources = []
        for config in configs:
            resource = stack.enter_context(Resource(config))
            resources.append(resource)
        yield resources
 
with resource_pool(configs) as resources:
    for r in resources:
        r.process()

Example: Multi-File Processing

def process_files(input_paths, output_path):
    with ExitStack() as stack:
        # Open all input files
        inputs = [stack.enter_context(open(p)) for p in input_paths]
        
        # Open output file
        output = stack.enter_context(open(output_path, "w"))
        
        # Merge inputs to output
        for infile in inputs:
            output.write(infile.read())

Patterns and Best Practices

Pattern 1: Setup/Teardown

The classic pattern—acquire then release:

@contextmanager
def managed_resource():
    resource = acquire()
    try:
        yield resource
    finally:
        release(resource)

Pattern 2: Exception Translation

Convert low-level exceptions to domain exceptions:

@contextmanager
def database_operation():
    try:
        yield
    except psycopg2.IntegrityError as e:
        raise DuplicateKeyError(str(e)) from e
    except psycopg2.OperationalError as e:
        raise DatabaseUnavailableError(str(e)) from e

Pattern 3: Logging and Tracing

@contextmanager
def trace(operation_name):
    import logging
    import uuid
    
    trace_id = str(uuid.uuid4())[:8]
    logger = logging.getLogger(__name__)
    
    logger.info(f"[{trace_id}] Starting: {operation_name}")
    try:
        yield trace_id
    except Exception as e:
        logger.error(f"[{trace_id}] Failed: {operation_name} - {e}")
        raise
    else:
        logger.info(f"[{trace_id}] Completed: {operation_name}")
 
with trace("user_signup") as trace_id:
    create_user(data)

Pattern 4: Optional Context Manager

@contextmanager
def optional_lock(lock, use_lock=True):
    if use_lock:
        with lock:
            yield
    else:
        yield
 
# Or simpler with nullcontext:
lock_ctx = lock if use_lock else nullcontext()
with lock_ctx:
    do_work()

Pattern 5: Reentrant Context Manager

Some contexts can be entered multiple times:

class ReentrantTimer:
    def __init__(self):
        self.depth = 0
        self.elapsed = 0
    
    def __enter__(self):
        if self.depth == 0:
            import time
            self.start = time.perf_counter()
        self.depth += 1
        return self
    
    def __exit__(self, *args):
        self.depth -= 1
        if self.depth == 0:
            import time
            self.elapsed = time.perf_counter() - self.start
        return False
 
timer = ReentrantTimer()
with timer:
    do_something()
    with timer:  # Nested entry
        do_more()
    # Still timing
# Now elapsed is set

Common Gotchas

Gotcha 1: Forgetting try/finally in @contextmanager

# WRONG - cleanup skipped on exception
@contextmanager
def bad():
    resource = acquire()
    yield resource
    release(resource)  # Never runs if exception!
 
# RIGHT
@contextmanager
def good():
    resource = acquire()
    try:
        yield resource
    finally:
        release(resource)

Gotcha 2: Generator Doesn't Yield

@contextmanager
def broken():
    if some_condition:
        return  # Error! Must yield exactly once
    yield
 
# Fix:
@contextmanager
def fixed():
    if some_condition:
        yield None
        return
    yield something

Gotcha 3: Yielding in a Loop

# WRONG - context manager, not generator
@contextmanager
def wrong():
    for item in items:
        yield item  # Error!
 
# This is a generator, use it as such:
def right():
    for item in items:
        yield item

Gotcha 4: Suppressing Exceptions Unexpectedly

def __exit__(self, exc_type, exc_val, exc_tb):
    self.cleanup()
    return self.suppress_errors  # Be explicit!

Gotcha 5: Not Handling __exit__ Exceptions

If __exit__ raises, the original exception is lost:

class Risky:
    def __exit__(self, *args):
        raise RuntimeError("Cleanup failed")  # Original exception lost!
 
# Better: log and re-raise original
def __exit__(self, exc_type, exc_val, exc_tb):
    try:
        self.cleanup()
    except Exception:
        if exc_type is None:
            raise  # No original exception, ok to raise
        logging.exception("Cleanup failed")  # Log, keep original
    return False

When to Use Context Managers

Use context managers for:

  • Resources that need cleanup (files, connections, locks)
  • Temporary state changes (directory, environment, config)
  • Setup/teardown patterns (timing, tracing, mocking)
  • Transaction boundaries (commit/rollback)
  • Scoped behavior (locking, suppressing warnings)

Consider alternatives when:

  • No cleanup is needed
  • The "context" spans multiple functions (use dependency injection)
  • You need to yield multiple values (use generators)

Summary

Context managers are Python's answer to the resource management problem. The key points:

  1. The protocol: __enter__ sets up, __exit__ tears down (always!)
  2. @contextmanager: Write context managers as generators
  3. contextlib: closing, suppress, redirect_stdout, nullcontext
  4. Async support: async with, @asynccontextmanager
  5. ExitStack: Manage dynamic numbers of contexts
  6. Always use try/finally in @contextmanager

Start simple with the with statement for files and locks. As you need more control, the full protocol and contextlib tools are there. Context managers make resource management both elegant and safe.

React to this post: