Python's queue module provides thread-safe queue implementations for multi-threaded programming. It's the backbone of producer-consumer patterns where threads need to safely pass data between each other.
Basic Queue (FIFO)
import queue
import threading
import time
def producer(q, items):
for item in items:
print(f"Producing: {item}")
q.put(item)
time.sleep(0.1)
q.put(None) # Sentinel to signal done
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Consuming: {item}")
q.task_done()
q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q, range(5)))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()Queue Types
Queue (FIFO)
First-in, first-out. Standard queue behavior:
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3LifoQueue (Stack)
Last-in, first-out. Like a stack:
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
print(q.get()) # 2
print(q.get()) # 1PriorityQueue
Items come out in priority order (lowest first):
q = queue.PriorityQueue()
q.put((3, "low priority"))
q.put((1, "high priority"))
q.put((2, "medium priority"))
print(q.get()) # (1, 'high priority')
print(q.get()) # (2, 'medium priority')
print(q.get()) # (3, 'low priority')Blocking and Timeouts
import queue
q = queue.Queue(maxsize=2) # Bounded queue
# put() blocks if queue is full
q.put("a")
q.put("b")
# q.put("c") # Would block forever
# Non-blocking put
try:
q.put("c", block=False)
except queue.Full:
print("Queue is full!")
# put() with timeout
try:
q.put("c", timeout=1.0)
except queue.Full:
print("Timed out waiting")
# get() blocks if queue is empty
item = q.get()
# Non-blocking get
try:
item = q.get(block=False)
except queue.Empty:
print("Queue is empty!")
# get() with timeout
try:
item = q.get(timeout=1.0)
except queue.Empty:
print("Timed out waiting")task_done() and join()
Track when all work is complete:
import queue
import threading
def worker(q):
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
q.task_done() # Signal this item is done
q = queue.Queue()
# Start workers
threads = []
for _ in range(3):
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)
# Add work
for item in range(10):
q.put(item)
# Wait for all items to be processed
q.join()
print("All items processed!")
# Stop workers
for _ in range(3):
q.put(None)
for t in threads:
t.join()Queue Size
q = queue.Queue()
q.put(1)
q.put(2)
print(q.qsize()) # 2 (approximate)
print(q.empty()) # False
print(q.full()) # False (unbounded queue)Note: qsize(), empty(), and full() are approximate due to threading.
Real-World Example: Worker Pool
import queue
import threading
import time
import random
def worker(work_queue, results_queue, worker_id):
while True:
try:
task = work_queue.get(timeout=1)
except queue.Empty:
continue
if task is None:
break
# Simulate work
result = task * 2
time.sleep(random.uniform(0.1, 0.3))
results_queue.put((worker_id, task, result))
work_queue.task_done()
# Create queues
work_queue = queue.Queue()
results_queue = queue.Queue()
# Start workers
workers = []
for i in range(4):
t = threading.Thread(
target=worker,
args=(work_queue, results_queue, i)
)
t.start()
workers.append(t)
# Submit work
for i in range(20):
work_queue.put(i)
# Wait for completion
work_queue.join()
# Collect results
results = []
while not results_queue.empty():
results.append(results_queue.get())
print(f"Processed {len(results)} items")
# Shutdown workers
for _ in workers:
work_queue.put(None)
for t in workers:
t.join()SimpleQueue (Python 3.7+)
Unbounded FIFO queue without task tracking:
q = queue.SimpleQueue()
q.put(1)
q.put(2)
print(q.get()) # 1
print(q.empty()) # FalseSimpler API—no task_done() or join().
Tips
- Always use sentinels (like
None) to signal shutdown - Bounded queues prevent memory issues with slow consumers
- task_done() must match get() calls for join() to work
- PriorityQueue items must be comparable (use tuples)
Summary
The queue module makes thread coordination safe and simple. Use Queue for FIFO, LifoQueue for stacks, PriorityQueue for prioritized work. The put()/get() blocking behavior and task_done()/join() completion tracking handle the hard parts of producer-consumer patterns.