The contextlib module provides utilities for creating and working with context managers beyond the basic __enter__/__exit__ protocol.

@contextmanager Decorator

Create context managers from generators:

from contextlib import contextmanager
import time
 
@contextmanager
def timer(label: str):
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label}: {elapsed:.3f}s")
 
with timer("Processing"):
    time.sleep(0.5)
# Processing: 0.500s

Yielding Values

from contextlib import contextmanager
 
@contextmanager
def temp_file(content: str):
    import tempfile
    import os
    
    fd, path = tempfile.mkstemp()
    try:
        with os.fdopen(fd, 'w') as f:
            f.write(content)
        yield path  # Value available as 'as' target
    finally:
        os.unlink(path)
 
with temp_file("test data") as path:
    print(f"File at: {path}")
    with open(path) as f:
        print(f.read())
# File automatically deleted

suppress() - Ignore Exceptions

from contextlib import suppress
 
# Instead of try/except pass
try:
    os.remove('nonexistent.txt')
except FileNotFoundError:
    pass
 
# Use suppress
from contextlib import suppress
import os
 
with suppress(FileNotFoundError):
    os.remove('nonexistent.txt')
 
# Multiple exception types
with suppress(FileNotFoundError, PermissionError):
    os.remove('file.txt')

redirect_stdout / redirect_stderr

from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
 
# Capture stdout
buffer = StringIO()
with redirect_stdout(buffer):
    print("This goes to buffer")
    print("So does this")
 
output = buffer.getvalue()
print(f"Captured: {output}")
 
# Redirect to file
with open('log.txt', 'w') as f:
    with redirect_stdout(f):
        print("This goes to file")
 
# Suppress output
import os
with redirect_stdout(open(os.devnull, 'w')):
    print("This is discarded")

ExitStack - Dynamic Context Management

from contextlib import ExitStack
 
# Manage variable number of context managers
def process_files(paths: list):
    with ExitStack() as stack:
        files = [stack.enter_context(open(p)) for p in paths]
        
        for f in files:
            print(f.read())
        # All files closed automatically
 
# Conditional context managers
def maybe_transaction(use_transaction: bool):
    with ExitStack() as stack:
        if use_transaction:
            stack.enter_context(database.transaction())
        
        # Do work...

ExitStack Callbacks

from contextlib import ExitStack
 
def cleanup(name):
    print(f"Cleaning up {name}")
 
with ExitStack() as stack:
    stack.callback(cleanup, "resource1")
    stack.callback(cleanup, "resource2")
    
    print("Doing work...")
# Output:
# Doing work...
# Cleaning up resource2
# Cleaning up resource1

nullcontext - Optional Context Manager

from contextlib import nullcontext
 
def process(use_lock: bool = False):
    lock = threading.Lock() if use_lock else nullcontext()
    
    with lock:
        # Work is done with or without lock
        do_work()
 
# Also useful for optional file handling
def read_data(path: str = None):
    cm = open(path) if path else nullcontext(sys.stdin)
    with cm as f:
        return f.read()

closing() - Ensure close() is Called

from contextlib import closing
from urllib.request import urlopen
 
# urlopen returns object with close() but isn't a context manager in older Python
with closing(urlopen('https://example.com')) as response:
    html = response.read()
 
# Works with any object that has close()
class Connection:
    def close(self):
        print("Connection closed")
 
with closing(Connection()) as conn:
    pass  # Connection.close() called automatically

Async Context Managers

from contextlib import asynccontextmanager
import asyncio
 
@asynccontextmanager
async def async_timer(label: str):
    start = asyncio.get_event_loop().time()
    try:
        yield
    finally:
        elapsed = asyncio.get_event_loop().time() - start
        print(f"{label}: {elapsed:.3f}s")
 
async def main():
    async with async_timer("Async operation"):
        await asyncio.sleep(0.5)
 
asyncio.run(main())

AsyncExitStack

from contextlib import AsyncExitStack
import aiofiles
 
async def process_files_async(paths: list):
    async with AsyncExitStack() as stack:
        files = [
            await stack.enter_async_context(aiofiles.open(p))
            for p in paths
        ]
        
        for f in files:
            content = await f.read()
            print(content)

Reusable Context Manager

from contextlib import contextmanager
 
# Problem: generator-based CMs are single-use
@contextmanager
def single_use():
    print("Enter")
    yield
    print("Exit")
 
cm = single_use()
with cm:
    pass  # Works
 
# with cm:  # RuntimeError: generator already executing
 
# Solution: Create new generator each time
class ReusableTimer:
    def __init__(self, label):
        self.label = label
    
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, *args):
        elapsed = time.perf_counter() - self.start
        print(f"{self.label}: {elapsed:.3f}s")
 
timer = ReusableTimer("Operation")
with timer:
    pass
with timer:  # Works!
    pass

Exception Handling in Context Managers

from contextlib import contextmanager
 
@contextmanager
def handle_errors():
    try:
        yield
    except ValueError as e:
        print(f"Handled ValueError: {e}")
        # Don't re-raise, exception is suppressed
    except TypeError as e:
        print(f"Logging TypeError: {e}")
        raise  # Re-raise the exception
 
with handle_errors():
    raise ValueError("This is handled")
 
# with handle_errors():
#     raise TypeError("This propagates")

Chained Context Managers

from contextlib import contextmanager
 
@contextmanager
def logged(name: str):
    print(f"Entering {name}")
    try:
        yield
    finally:
        print(f"Exiting {name}")
 
@contextmanager
def timed():
    start = time.perf_counter()
    try:
        yield
    finally:
        print(f"Elapsed: {time.perf_counter() - start:.3f}s")
 
# Nest them
with logged("operation"), timed():
    time.sleep(0.1)
 
# Or compose
@contextmanager
def logged_and_timed(name: str):
    with logged(name), timed():
        yield

Database Transaction Pattern

from contextlib import contextmanager
 
@contextmanager
def transaction(connection):
    cursor = connection.cursor()
    try:
        yield cursor
        connection.commit()
    except Exception:
        connection.rollback()
        raise
    finally:
        cursor.close()
 
with transaction(db_connection) as cursor:
    cursor.execute("INSERT INTO users VALUES (?)", ("Alice",))
    cursor.execute("INSERT INTO users VALUES (?)", ("Bob",))
# Auto-commit on success, rollback on exception

Resource Pool Pattern

from contextlib import contextmanager
from queue import Queue
 
class ConnectionPool:
    def __init__(self, size: int):
        self._pool = Queue(maxsize=size)
        for _ in range(size):
            self._pool.put(self._create_connection())
    
    def _create_connection(self):
        return {"id": id(object())}  # Placeholder
    
    @contextmanager
    def connection(self):
        conn = self._pool.get()
        try:
            yield conn
        finally:
            self._pool.put(conn)
 
pool = ConnectionPool(5)
with pool.connection() as conn:
    print(f"Using connection {conn['id']}")

Summary Table

UtilityUse Case
@contextmanagerCreate CM from generator
@asynccontextmanagerAsync version
suppress()Ignore specific exceptions
redirect_stdout/stderrCapture or redirect output
ExitStackDynamic/conditional CMs
nullcontext()Optional/no-op CM
closing()Ensure close() is called

The contextlib module transforms resource management from boilerplate into clean, composable patterns.

React to this post: