Python's queue module provides thread-safe queue implementations for multi-threaded programming. It's the backbone of producer-consumer patterns where threads need to safely pass data between each other.

Basic Queue (FIFO)

import queue
import threading
import time
 
def producer(q, items):
    for item in items:
        print(f"Producing: {item}")
        q.put(item)
        time.sleep(0.1)
    q.put(None)  # Sentinel to signal done
 
def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consuming: {item}")
        q.task_done()
 
q = queue.Queue()
 
producer_thread = threading.Thread(target=producer, args=(q, range(5)))
consumer_thread = threading.Thread(target=consumer, args=(q,))
 
producer_thread.start()
consumer_thread.start()
 
producer_thread.join()
consumer_thread.join()

Queue Types

Queue (FIFO)

First-in, first-out. Standard queue behavior:

q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
 
print(q.get())  # 1
print(q.get())  # 2
print(q.get())  # 3

LifoQueue (Stack)

Last-in, first-out. Like a stack:

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
 
print(q.get())  # 3
print(q.get())  # 2
print(q.get())  # 1

PriorityQueue

Items come out in priority order (lowest first):

q = queue.PriorityQueue()
q.put((3, "low priority"))
q.put((1, "high priority"))
q.put((2, "medium priority"))
 
print(q.get())  # (1, 'high priority')
print(q.get())  # (2, 'medium priority')
print(q.get())  # (3, 'low priority')

Blocking and Timeouts

import queue
 
q = queue.Queue(maxsize=2)  # Bounded queue
 
# put() blocks if queue is full
q.put("a")
q.put("b")
# q.put("c")  # Would block forever
 
# Non-blocking put
try:
    q.put("c", block=False)
except queue.Full:
    print("Queue is full!")
 
# put() with timeout
try:
    q.put("c", timeout=1.0)
except queue.Full:
    print("Timed out waiting")
 
# get() blocks if queue is empty
item = q.get()
 
# Non-blocking get
try:
    item = q.get(block=False)
except queue.Empty:
    print("Queue is empty!")
 
# get() with timeout
try:
    item = q.get(timeout=1.0)
except queue.Empty:
    print("Timed out waiting")

task_done() and join()

Track when all work is complete:

import queue
import threading
 
def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing {item}")
        q.task_done()  # Signal this item is done
 
q = queue.Queue()
 
# Start workers
threads = []
for _ in range(3):
    t = threading.Thread(target=worker, args=(q,))
    t.start()
    threads.append(t)
 
# Add work
for item in range(10):
    q.put(item)
 
# Wait for all items to be processed
q.join()
print("All items processed!")
 
# Stop workers
for _ in range(3):
    q.put(None)
for t in threads:
    t.join()

Queue Size

q = queue.Queue()
q.put(1)
q.put(2)
 
print(q.qsize())   # 2 (approximate)
print(q.empty())   # False
print(q.full())    # False (unbounded queue)

Note: qsize(), empty(), and full() are approximate due to threading.

Real-World Example: Worker Pool

import queue
import threading
import time
import random
 
def worker(work_queue, results_queue, worker_id):
    while True:
        try:
            task = work_queue.get(timeout=1)
        except queue.Empty:
            continue
        
        if task is None:
            break
        
        # Simulate work
        result = task * 2
        time.sleep(random.uniform(0.1, 0.3))
        
        results_queue.put((worker_id, task, result))
        work_queue.task_done()
 
# Create queues
work_queue = queue.Queue()
results_queue = queue.Queue()
 
# Start workers
workers = []
for i in range(4):
    t = threading.Thread(
        target=worker, 
        args=(work_queue, results_queue, i)
    )
    t.start()
    workers.append(t)
 
# Submit work
for i in range(20):
    work_queue.put(i)
 
# Wait for completion
work_queue.join()
 
# Collect results
results = []
while not results_queue.empty():
    results.append(results_queue.get())
 
print(f"Processed {len(results)} items")
 
# Shutdown workers
for _ in workers:
    work_queue.put(None)
for t in workers:
    t.join()

SimpleQueue (Python 3.7+)

Unbounded FIFO queue without task tracking:

q = queue.SimpleQueue()
q.put(1)
q.put(2)
 
print(q.get())  # 1
print(q.empty())  # False

Simpler API—no task_done() or join().

Tips

  1. Always use sentinels (like None) to signal shutdown
  2. Bounded queues prevent memory issues with slow consumers
  3. task_done() must match get() calls for join() to work
  4. PriorityQueue items must be comparable (use tuples)

Summary

The queue module makes thread coordination safe and simple. Use Queue for FIFO, LifoQueue for stacks, PriorityQueue for prioritized work. The put()/get() blocking behavior and task_done()/join() completion tracking handle the hard parts of producer-consumer patterns.

React to this post: