Beyond simple command execution, subprocess enables Unix-style piping between processes.
Basic Pipe
import subprocess
# Capture output
result = subprocess.run(
['ls', '-la'],
capture_output=True,
text=True
)
print(result.stdout)Piping Between Commands
import subprocess
# ls | grep ".py"
ls = subprocess.Popen(
['ls', '-la'],
stdout=subprocess.PIPE
)
grep = subprocess.Popen(
['grep', '.py'],
stdin=ls.stdout,
stdout=subprocess.PIPE,
text=True
)
ls.stdout.close() # Allow ls to receive SIGPIPE
output, _ = grep.communicate()
print(output)Chain Multiple Commands
import subprocess
def pipe(*commands):
"""Chain commands with pipes."""
procs = []
for i, cmd in enumerate(commands):
stdin = procs[-1].stdout if procs else None
proc = subprocess.Popen(
cmd,
stdin=stdin,
stdout=subprocess.PIPE,
text=True
)
if procs:
procs[-1].stdout.close()
procs.append(proc)
output, _ = procs[-1].communicate()
# Wait for all processes
for proc in procs[:-1]:
proc.wait()
return output
# cat file | grep pattern | sort | uniq
result = pipe(
['cat', 'data.txt'],
['grep', 'error'],
['sort'],
['uniq', '-c']
)Streaming Output
import subprocess
# Stream line by line
proc = subprocess.Popen(
['tail', '-f', '/var/log/syslog'],
stdout=subprocess.PIPE,
text=True
)
for line in proc.stdout:
print(f"LOG: {line.strip()}")
if 'error' in line.lower():
break
proc.terminate()Feed Input to Process
import subprocess
# Echo input through process
proc = subprocess.Popen(
['sort'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)
output, _ = proc.communicate(input='banana\napple\ncherry\n')
print(output) # apple\nbanana\ncherry\nBidirectional Communication
import subprocess
proc = subprocess.Popen(
['python3', '-c', 'while True: print(input().upper())'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
bufsize=1 # Line buffered
)
proc.stdin.write('hello\n')
proc.stdin.flush()
print(proc.stdout.readline()) # HELLO
proc.stdin.write('world\n')
proc.stdin.flush()
print(proc.stdout.readline()) # WORLD
proc.terminate()Timeout with Pipes
import subprocess
proc = subprocess.Popen(
['long_running_command'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
stdout, stderr = proc.communicate(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
print("Process timed out")stderr Handling
import subprocess
# Capture separately
result = subprocess.run(
['ls', '/nonexistent'],
capture_output=True,
text=True
)
print(f"stdout: {result.stdout}")
print(f"stderr: {result.stderr}")
# Merge stderr into stdout
proc = subprocess.Popen(
['ls', '/nonexistent'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
output, _ = proc.communicate()
# Discard stderr
proc = subprocess.Popen(
['command'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)Shell Pipes (Use Carefully)
import subprocess
# Using shell=True for complex pipes
result = subprocess.run(
'cat data.txt | grep error | wc -l',
shell=True,
capture_output=True,
text=True
)
print(result.stdout)
# DANGER: Never use shell=True with user input
# user_input = "; rm -rf /" # Malicious!
# subprocess.run(f'echo {user_input}', shell=True) # BAD!Process Substitution
import subprocess
import tempfile
def process_substitution(cmd):
"""Simulate bash's <(command)."""
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE
)
return proc.stdout
# diff <(cmd1) <(cmd2)
with process_substitution(['sort', 'file1.txt']) as f1, \
process_substitution(['sort', 'file2.txt']) as f2:
diff = subprocess.run(
['diff', '/dev/fd/3', '/dev/fd/4'],
stdin=subprocess.DEVNULL,
pass_fds=(f1.fileno(), f2.fileno()),
capture_output=True
)Async Subprocess
import asyncio
async def run_command(cmd):
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout.decode()
async def main():
results = await asyncio.gather(
run_command(['ls', '-la']),
run_command(['df', '-h']),
run_command(['uptime'])
)
for r in results:
print(r[:100])
asyncio.run(main())Context Manager Pattern
import subprocess
from contextlib import contextmanager
@contextmanager
def piped_process(cmd, **kwargs):
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs
)
try:
yield proc
finally:
proc.terminate()
proc.wait()
with piped_process(['tail', '-f', 'log.txt'], text=True) as proc:
for line in proc.stdout:
if 'done' in line:
break
print(line.strip())Real-Time Output
import subprocess
import sys
# Stream output as it happens
proc = subprocess.Popen(
['ping', '-c', '5', 'google.com'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
for line in iter(proc.stdout.readline, ''):
sys.stdout.write(line)
sys.stdout.flush()
proc.wait()Practical: Log Processor
import subprocess
from collections import Counter
def count_log_levels(log_file):
"""Count ERROR, WARN, INFO in log file."""
grep = subprocess.Popen(
['grep', '-oE', '(ERROR|WARN|INFO)', log_file],
stdout=subprocess.PIPE,
text=True
)
counts = Counter()
for line in grep.stdout:
counts[line.strip()] += 1
grep.wait()
return dict(counts)
# Or pure Python pipe
def pipe_count(log_file):
cat = subprocess.Popen(['cat', log_file], stdout=subprocess.PIPE)
grep = subprocess.Popen(
['grep', '-oE', 'ERROR|WARN|INFO'],
stdin=cat.stdout,
stdout=subprocess.PIPE,
text=True
)
sort = subprocess.Popen(
['sort'],
stdin=grep.stdout,
stdout=subprocess.PIPE,
text=True
)
uniq = subprocess.Popen(
['uniq', '-c'],
stdin=sort.stdout,
stdout=subprocess.PIPE,
text=True
)
cat.stdout.close()
grep.stdout.close()
sort.stdout.close()
output, _ = uniq.communicate()
return outputSummary
subprocess pipe patterns:
- Basic pipe:
stdout=subprocess.PIPE - Chain commands: Connect stdout → stdin
- Stream output: Iterate
proc.stdout - Bidirectional: Use
bufsize=1for line buffering - Error handling:
stderr=PIPE,STDOUT, orDEVNULL - Async:
asyncio.create_subprocess_exec()
Avoid shell=True with untrusted input. Use explicit pipes for safety.
React to this post: