Threading in Python is ideal for IO-bound tasks. The GIL limits CPU parallelism, but threads excel at waiting on network, disk, and other IO operations.

Basic Thread Creation

import threading
import time
 
def worker(name, duration):
    print(f"Thread {name} starting")
    time.sleep(duration)
    print(f"Thread {name} done")
 
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i, 1))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print("All threads complete")

Thread Pool with concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
 
def fetch_url(url):
    with urllib.request.urlopen(url, timeout=5) as response:
        return url, len(response.read())
 
urls = [
    "https://python.org",
    "https://pypi.org",
    "https://docs.python.org",
]
 
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}
    
    for future in as_completed(futures):
        url = futures[future]
        try:
            url, size = future.result()
            print(f"{url}: {size} bytes")
        except Exception as e:
            print(f"{url}: Error - {e}")

Lock for Shared State

import threading
 
class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:
            current = self.value
            self.value = current + 1
    
    def get(self):
        with self.lock:
            return self.value
 
counter = Counter()
threads = [
    threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
    for _ in range(10)
]
 
for t in threads:
    t.start()
for t in threads:
    t.join()
 
print(counter.get())  # 10000

RLock for Nested Locking

import threading
 
class Account:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.RLock()  # Reentrant lock
    
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
    
    def transfer_to(self, other, amount):
        with self.lock:
            if self.withdraw(amount):  # Same lock, nested
                other.deposit(amount)
                return True
            return False
    
    def deposit(self, amount):
        with self.lock:
            self.balance += amount

Event for Signaling

import threading
import time
 
event = threading.Event()
 
def waiter(name):
    print(f"{name} waiting for signal...")
    event.wait()
    print(f"{name} proceeding!")
 
def signaler():
    time.sleep(2)
    print("Sending signal")
    event.set()
 
threads = [
    threading.Thread(target=waiter, args=("A",)),
    threading.Thread(target=waiter, args=("B",)),
    threading.Thread(target=signaler),
]
 
for t in threads:
    t.start()
for t in threads:
    t.join()

Condition for Complex Coordination

import threading
import time
 
class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity
        self.condition = threading.Condition()
    
    def put(self, item):
        with self.condition:
            while len(self.buffer) >= self.capacity:
                self.condition.wait()
            self.buffer.append(item)
            self.condition.notify()
    
    def get(self):
        with self.condition:
            while not self.buffer:
                self.condition.wait()
            item = self.buffer.pop(0)
            self.condition.notify()
            return item
 
buffer = BoundedBuffer(5)
 
def producer():
    for i in range(10):
        buffer.put(i)
        print(f"Produced: {i}")
 
def consumer():
    for _ in range(10):
        item = buffer.get()
        print(f"Consumed: {item}")
        time.sleep(0.1)
 
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Semaphore for Rate Limiting

import threading
import time
 
semaphore = threading.Semaphore(3)  # Max 3 concurrent
 
def limited_task(task_id):
    with semaphore:
        print(f"Task {task_id} running")
        time.sleep(1)
        print(f"Task {task_id} done")
 
threads = [
    threading.Thread(target=limited_task, args=(i,))
    for i in range(10)
]
 
for t in threads:
    t.start()
for t in threads:
    t.join()

Barrier for Synchronization Points

import threading
import time
import random
 
barrier = threading.Barrier(3)
 
def worker(worker_id):
    # Phase 1: Independent work
    sleep_time = random.uniform(0.5, 1.5)
    time.sleep(sleep_time)
    print(f"Worker {worker_id} finished phase 1")
    
    # Wait for all workers
    barrier.wait()
    
    # Phase 2: Continue together
    print(f"Worker {worker_id} starting phase 2")
 
threads = [
    threading.Thread(target=worker, args=(i,))
    for i in range(3)
]
 
for t in threads:
    t.start()
for t in threads:
    t.join()

Timer for Delayed Execution

import threading
 
def delayed_task():
    print("Task executed after delay")
 
# Execute after 3 seconds
timer = threading.Timer(3.0, delayed_task)
timer.start()
 
# Can cancel before execution
# timer.cancel()

Thread-Local Data

import threading
 
local_data = threading.local()
 
def worker(value):
    local_data.value = value
    # Each thread has its own copy
    print(f"Thread {threading.current_thread().name}: {local_data.value}")
 
threads = [
    threading.Thread(target=worker, args=(i,), name=f"Worker-{i}")
    for i in range(3)
]
 
for t in threads:
    t.start()
for t in threads:
    t.join()

Daemon Threads

import threading
import time
 
def background_task():
    while True:
        print("Background work...")
        time.sleep(1)
 
# Daemon thread exits when main thread exits
t = threading.Thread(target=background_task, daemon=True)
t.start()
 
time.sleep(3)
print("Main thread exiting")
# Daemon thread automatically killed

Real-World: Concurrent API Client

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
 
class APIClient:
    def __init__(self, max_concurrent=10, rate_limit=100):
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
        self.semaphore = threading.Semaphore(rate_limit)
        self.results = []
        self.lock = threading.Lock()
    
    def _fetch(self, endpoint):
        with self.semaphore:
            # Simulate API call
            time.sleep(0.1)
            return {"endpoint": endpoint, "status": "ok"}
    
    def fetch_all(self, endpoints):
        futures = {
            self.executor.submit(self._fetch, ep): ep 
            for ep in endpoints
        }
        
        results = []
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                results.append({"error": str(e)})
        
        return results
    
    def shutdown(self):
        self.executor.shutdown(wait=True)
 
# Usage
client = APIClient()
endpoints = [f"/api/resource/{i}" for i in range(50)]
results = client.fetch_all(endpoints)
client.shutdown()

When to Use Threading vs Multiprocessing

Use CaseThreadingMultiprocessing
IO-bound
CPU-bound
Shared stateEasyComplex
MemorySharedSeparate
OverheadLowHigh

Threading shines for concurrent IO. For CPU parallelism, reach for multiprocessing.

React to this post: