Threading in Python is ideal for IO-bound tasks. The GIL limits CPU parallelism, but threads excel at waiting on network, disk, and other IO operations.
Basic Thread Creation
import threading
import time
def worker(name, duration):
print(f"Thread {name} starting")
time.sleep(duration)
print(f"Thread {name} done")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i, 1))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All threads complete")Thread Pool with concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
def fetch_url(url):
with urllib.request.urlopen(url, timeout=5) as response:
return url, len(response.read())
urls = [
"https://python.org",
"https://pypi.org",
"https://docs.python.org",
]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
url, size = future.result()
print(f"{url}: {size} bytes")
except Exception as e:
print(f"{url}: Error - {e}")Lock for Shared State
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
current = self.value
self.value = current + 1
def get(self):
with self.lock:
return self.value
counter = Counter()
threads = [
threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
for _ in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.get()) # 10000RLock for Nested Locking
import threading
class Account:
def __init__(self, balance):
self.balance = balance
self.lock = threading.RLock() # Reentrant lock
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
return True
return False
def transfer_to(self, other, amount):
with self.lock:
if self.withdraw(amount): # Same lock, nested
other.deposit(amount)
return True
return False
def deposit(self, amount):
with self.lock:
self.balance += amountEvent for Signaling
import threading
import time
event = threading.Event()
def waiter(name):
print(f"{name} waiting for signal...")
event.wait()
print(f"{name} proceeding!")
def signaler():
time.sleep(2)
print("Sending signal")
event.set()
threads = [
threading.Thread(target=waiter, args=("A",)),
threading.Thread(target=waiter, args=("B",)),
threading.Thread(target=signaler),
]
for t in threads:
t.start()
for t in threads:
t.join()Condition for Complex Coordination
import threading
import time
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.condition = threading.Condition()
def put(self, item):
with self.condition:
while len(self.buffer) >= self.capacity:
self.condition.wait()
self.buffer.append(item)
self.condition.notify()
def get(self):
with self.condition:
while not self.buffer:
self.condition.wait()
item = self.buffer.pop(0)
self.condition.notify()
return item
buffer = BoundedBuffer(5)
def producer():
for i in range(10):
buffer.put(i)
print(f"Produced: {i}")
def consumer():
for _ in range(10):
item = buffer.get()
print(f"Consumed: {item}")
time.sleep(0.1)
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()Semaphore for Rate Limiting
import threading
import time
semaphore = threading.Semaphore(3) # Max 3 concurrent
def limited_task(task_id):
with semaphore:
print(f"Task {task_id} running")
time.sleep(1)
print(f"Task {task_id} done")
threads = [
threading.Thread(target=limited_task, args=(i,))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()Barrier for Synchronization Points
import threading
import time
import random
barrier = threading.Barrier(3)
def worker(worker_id):
# Phase 1: Independent work
sleep_time = random.uniform(0.5, 1.5)
time.sleep(sleep_time)
print(f"Worker {worker_id} finished phase 1")
# Wait for all workers
barrier.wait()
# Phase 2: Continue together
print(f"Worker {worker_id} starting phase 2")
threads = [
threading.Thread(target=worker, args=(i,))
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()Timer for Delayed Execution
import threading
def delayed_task():
print("Task executed after delay")
# Execute after 3 seconds
timer = threading.Timer(3.0, delayed_task)
timer.start()
# Can cancel before execution
# timer.cancel()Thread-Local Data
import threading
local_data = threading.local()
def worker(value):
local_data.value = value
# Each thread has its own copy
print(f"Thread {threading.current_thread().name}: {local_data.value}")
threads = [
threading.Thread(target=worker, args=(i,), name=f"Worker-{i}")
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()Daemon Threads
import threading
import time
def background_task():
while True:
print("Background work...")
time.sleep(1)
# Daemon thread exits when main thread exits
t = threading.Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
print("Main thread exiting")
# Daemon thread automatically killedReal-World: Concurrent API Client
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
class APIClient:
def __init__(self, max_concurrent=10, rate_limit=100):
self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
self.semaphore = threading.Semaphore(rate_limit)
self.results = []
self.lock = threading.Lock()
def _fetch(self, endpoint):
with self.semaphore:
# Simulate API call
time.sleep(0.1)
return {"endpoint": endpoint, "status": "ok"}
def fetch_all(self, endpoints):
futures = {
self.executor.submit(self._fetch, ep): ep
for ep in endpoints
}
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
results.append({"error": str(e)})
return results
def shutdown(self):
self.executor.shutdown(wait=True)
# Usage
client = APIClient()
endpoints = [f"/api/resource/{i}" for i in range(50)]
results = client.fetch_all(endpoints)
client.shutdown()When to Use Threading vs Multiprocessing
| Use Case | Threading | Multiprocessing |
|---|---|---|
| IO-bound | ✅ | ❌ |
| CPU-bound | ❌ | ✅ |
| Shared state | Easy | Complex |
| Memory | Shared | Separate |
| Overhead | Low | High |
Threading shines for concurrent IO. For CPU parallelism, reach for multiprocessing.
React to this post: