Python Multiprocessing and Multithreading: A Comprehensive Guide
Introduction
Python provides two primary approaches for concurrent execution: multithreading and multiprocessing. Understanding when and how to use each is crucial for writing efficient Python applications.
- Multithreading: Multiple threads within a single process sharing memory space
- Multiprocessing: Multiple separate processes, each with its own memory space
Understanding Concurrency vs Parallelism
Concurrency
Concurrency is about dealing with multiple tasks at once, but not necessarily executing them simultaneously. Tasks may be interleaved or switched between rapidly.
Parallelism
Parallelism is about executing multiple tasks simultaneously, typically on multiple CPU cores.
# Concurrent execution (may not be parallel)
import threading
import time
def task(name):
for i in range(3):
print(f"Task {name}: {i}")
0.1)
time.sleep(
# Create threads
= threading.Thread(target=task, args=("A",))
t1 = threading.Thread(target=task, args=("B",))
t2
# Start threads
t1.start()
t2.start()
# Wait for completion
t1.join() t2.join()
The Global Interpreter Lock (GIL)
The GIL is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This has important implications:
GIL Impact
- CPU-bound tasks: Multithreading provides little benefit due to GIL
- I/O-bound tasks: Multithreading can be effective as GIL is released during I/O operations
- Multiprocessing: Bypasses GIL limitations by using separate processes
When GIL is Released
- File I/O operations
- Network I/O operations
- Image processing (PIL/Pillow)
- NumPy operations
- Time.sleep() calls
Multithreading with threading Module
Basic Thread Creation
import threading
import time
# Method 1: Using Thread class directly
def worker_function(name, delay):
for i in range(5):
print(f"Worker {name}: {i}")
time.sleep(delay)
# Create and start threads
= threading.Thread(target=worker_function, args=("A", 0.5))
thread1 = threading.Thread(target=worker_function, args=("B", 0.3))
thread2
thread1.start()
thread2.start()
thread1.join() thread2.join()
Thread Subclassing
import threading
import time
class WorkerThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
for i in range(5):
print(f"Worker {self.name}: {i}")
self.delay)
time.sleep(
# Create and start threads
= WorkerThread("A", 0.5)
worker1 = WorkerThread("B", 0.3)
worker2
worker1.start()
worker2.start()
worker1.join() worker2.join()
Thread Pool Executor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(name, duration):
print(f"Starting task {name}")
time.sleep(duration)return f"Task {name} completed"
# Using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
= [
futures "A", 2),
executor.submit(task, "B", 1),
executor.submit(task, "C", 3)
executor.submit(task,
]
# Collect results as they complete
for future in as_completed(futures):
= future.result()
result print(result)
Thread-Safe Operations
import threading
import time
class ThreadSafeCounter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
= self.value
temp 0.001) # Simulate processing
time.sleep(self.value = temp + 1
def get_value(self):
with self.lock:
return self.value
# Demonstrate thread safety
= ThreadSafeCounter()
counter
def worker():
for _ in range(100):
counter.increment()
= []
threads for _ in range(10):
= threading.Thread(target=worker)
t
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter.get_value()}")
Multiprocessing with multiprocessing Module
Basic Process Creation
import multiprocessing
import time
import os
def worker_function(name, delay):
= os.getpid()
process_id for i in range(5):
print(f"Worker {name} (PID: {process_id}): {i}")
time.sleep(delay)
if __name__ == "__main__":
# Create and start processes
= multiprocessing.Process(target=worker_function, args=("A", 0.5))
process1 = multiprocessing.Process(target=worker_function, args=("B", 0.3))
process2
process1.start()
process2.start()
process1.join() process2.join()
Process Pool
import multiprocessing
import time
def compute_square(n):
"""CPU-intensive task"""
return n * n
def compute_with_delay(n):
"""Simulate processing time"""
0.1)
time.sleep(return n * n
if __name__ == "__main__":
= list(range(1, 11))
numbers
# Sequential execution
= time.time()
start_time = [compute_with_delay(n) for n in numbers]
sequential_results = time.time() - start_time
sequential_time
# Parallel execution
= time.time()
start_time with multiprocessing.Pool(processes=4) as pool:
= pool.map(compute_with_delay, numbers)
parallel_results = time.time() - start_time
parallel_time
print(f"Sequential time: {sequential_time:.2f} seconds")
print(f"Parallel time: {parallel_time:.2f} seconds")
print(f"Speedup: {sequential_time/parallel_time:.2f}x")
Process Pool Executor
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def cpu_intensive_task(n):
"""Simulate CPU-intensive computation"""
= 0
total for i in range(n * 1000000):
+= i
total return total
if __name__ == "__main__":
= [100, 200, 300, 400, 500]
tasks
with ProcessPoolExecutor(max_workers=4) as executor:
# Submit all tasks
= [executor.submit(cpu_intensive_task, task) for task in tasks]
futures
# Collect results
for i, future in enumerate(as_completed(futures)):
= future.result()
result print(f"Task {i+1} completed with result: {result}")
Communication Between Processes/Threads
Queues
import multiprocessing
import threading
import time
# Process Queue
def producer(queue, items):
for item in items:
queue.put(item)print(f"Produced: {item}")
0.1)
time.sleep(None) # Sentinel value
queue.put(
def consumer(queue):
while True:
= queue.get()
item if item is None:
break
print(f"Consumed: {item}")
0.2)
time.sleep(
if __name__ == "__main__":
# Process communication
= multiprocessing.Queue()
process_queue = ['item1', 'item2', 'item3', 'item4']
items
= multiprocessing.Process(target=producer, args=(process_queue, items))
producer_process = multiprocessing.Process(target=consumer, args=(process_queue,))
consumer_process
producer_process.start()
consumer_process.start()
producer_process.join() consumer_process.join()
Pipes
import multiprocessing
import time
def sender(conn, messages):
for msg in messages:
conn.send(msg)print(f"Sent: {msg}")
0.1)
time.sleep(
conn.close()
def receiver(conn):
while True:
try:
= conn.recv()
msg print(f"Received: {msg}")
except EOFError:
break
if __name__ == "__main__":
= multiprocessing.Pipe()
parent_conn, child_conn = ['Hello', 'World', 'From', 'Process']
messages
= multiprocessing.Process(target=sender, args=(child_conn, messages))
sender_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
receiver_process
sender_process.start()
receiver_process.start()
sender_process.join() receiver_process.join()
Synchronization Primitives
Locks
import threading
import time
# Thread Lock
= 0
shared_resource = threading.Lock()
lock
def increment_with_lock():
global shared_resource
for _ in range(100000):
with lock:
+= 1
shared_resource
def increment_without_lock():
global shared_resource
for _ in range(100000):
+= 1
shared_resource
# Demonstrate race condition
= 0
shared_resource = []
threads for _ in range(5):
= threading.Thread(target=increment_without_lock)
t
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Without lock: {shared_resource}")
# With lock
= 0
shared_resource = []
threads for _ in range(5):
= threading.Thread(target=increment_with_lock)
t
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"With lock: {shared_resource}")
Semaphores
import threading
import time
# Semaphore to limit concurrent access
= threading.Semaphore(2) # Allow 2 concurrent accesses
semaphore
def access_resource(worker_id):
with semaphore:
print(f"Worker {worker_id} accessing resource")
2) # Simulate work
time.sleep(print(f"Worker {worker_id} finished")
= []
threads for i in range(5):
= threading.Thread(target=access_resource, args=(i,))
t
threads.append(t)
t.start()
for t in threads:
t.join()
Condition Variables
import threading
import time
import random
# Producer-Consumer with Condition
= threading.Condition()
condition buffer = []
= 5
MAX_SIZE
def producer():
for i in range(10):
with condition:
while len(buffer) >= MAX_SIZE:
print("Buffer full, producer waiting...")
condition.wait()
= f"item_{i}"
item buffer.append(item)
print(f"Produced: {item}")
condition.notify_all()
0.1, 0.5))
time.sleep(random.uniform(
def consumer(consumer_id):
for _ in range(5):
with condition:
while not buffer:
print(f"Consumer {consumer_id} waiting...")
condition.wait()
= buffer.pop(0)
item print(f"Consumer {consumer_id} consumed: {item}")
condition.notify_all()
0.1, 0.5))
time.sleep(random.uniform(
# Start producer and consumers
= threading.Thread(target=producer)
producer_thread = [threading.Thread(target=consumer, args=(i,)) for i in range(2)]
consumer_threads
producer_thread.start()for t in consumer_threads:
t.start()
producer_thread.join()for t in consumer_threads:
t.join()
Performance Comparison
I/O-Bound Tasks
import time
import requests
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def fetch_url(url):
"""Simulate I/O-bound task"""
try:
= requests.get(url, timeout=5)
response return f"Status: {response.status_code}"
except:
return "Error"
def time_execution(func, *args, **kwargs):
= time.time()
start = func(*args, **kwargs)
result = time.time()
end return result, end - start
# Sequential execution
def sequential_fetch(urls):
return [fetch_url(url) for url in urls]
# Threaded execution
def threaded_fetch(urls):
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(fetch_url, urls))
# Process execution
def process_fetch(urls):
with ProcessPoolExecutor(max_workers=10) as executor:
return list(executor.map(fetch_url, urls))
if __name__ == "__main__":
= ['https://httpbin.org/delay/1'] * 10
urls
# Compare performance
= time_execution(sequential_fetch, urls)
_, seq_time = time_execution(threaded_fetch, urls)
_, thread_time = time_execution(process_fetch, urls)
_, process_time
print(f"Sequential: {seq_time:.2f}s")
print(f"Threading: {thread_time:.2f}s")
print(f"Multiprocessing: {process_time:.2f}s")
CPU-Bound Tasks
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task(n):
"""CPU-intensive computation"""
= 0
total for i in range(n):
+= i ** 2
total return total
def compare_performance():
= [1000000] * 8
numbers
# Sequential
= time.time()
start = [cpu_bound_task(n) for n in numbers]
sequential_results = time.time() - start
sequential_time
# Threading
= time.time()
start with ThreadPoolExecutor(max_workers=8) as executor:
= list(executor.map(cpu_bound_task, numbers))
thread_results = time.time() - start
thread_time
# Multiprocessing
= time.time()
start with ProcessPoolExecutor(max_workers=8) as executor:
= list(executor.map(cpu_bound_task, numbers))
process_results = time.time() - start
process_time
print(f"CPU-bound task comparison:")
print(f"Sequential: {sequential_time:.2f}s")
print(f"Threading: {thread_time:.2f}s")
print(f"Multiprocessing: {process_time:.2f}s")
print(f"Process speedup: {sequential_time/process_time:.2f}x")
if __name__ == "__main__":
compare_performance()
Best Practices
1. Choose the Right Approach
# For I/O-bound tasks: Use threading
import threading
from concurrent.futures import ThreadPoolExecutor
def io_bound_work():
# File operations, network requests, database queries
pass
# For CPU-bound tasks: Use multiprocessing
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_work():
# Mathematical computations, image processing, data analysis
pass
2. Resource Management
import multiprocessing
import threading
from contextlib import contextmanager
@contextmanager
def managed_thread_pool(max_workers):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
yield executor
@contextmanager
def managed_process_pool(max_workers):
with ProcessPoolExecutor(max_workers=max_workers) as executor:
yield executor
# Usage
with managed_thread_pool(4) as executor:
= [executor.submit(some_function, arg) for arg in args]
futures = [future.result() for future in futures] results
3. Error Handling
import threading
import multiprocessing
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
def safe_worker(task_id):
try:
# Your work here
= f"Task {task_id} completed"
result return result
except Exception as e:
f"Task {task_id} failed: {e}")
logging.error(return None
def execute_with_error_handling():
with ThreadPoolExecutor(max_workers=4) as executor:
= [executor.submit(safe_worker, i) for i in range(10)]
futures
for future in as_completed(futures):
try:
= future.result()
result if result:
print(result)
except Exception as e:
f"Future failed: {e}") logging.error(
4. Graceful Shutdown
import threading
import time
import signal
import sys
class GracefulWorker:
def __init__(self):
self.shutdown_event = threading.Event()
self.threads = []
def worker(self, worker_id):
while not self.shutdown_event.is_set():
print(f"Worker {worker_id} working...")
1)
time.sleep(print(f"Worker {worker_id} shutting down")
def start_workers(self, num_workers):
for i in range(num_workers):
= threading.Thread(target=self.worker, args=(i,))
t
t.start()self.threads.append(t)
def shutdown(self):
print("Initiating graceful shutdown...")
self.shutdown_event.set()
for t in self.threads:
t.join()print("All workers shut down")
# Usage
= GracefulWorker()
worker_manager
def signal_handler(signum, frame):
worker_manager.shutdown()0)
sys.exit(
signal.signal(signal.SIGINT, signal_handler)3)
worker_manager.start_workers(
# Keep main thread alive
try:
while True:
1)
time.sleep(except KeyboardInterrupt:
worker_manager.shutdown()
Advanced Topics
1. Custom Thread Pool
import threading
import queue
import time
class SimpleThreadPool:
def __init__(self, num_workers):
self.task_queue = queue.Queue()
self.workers = []
self.shutdown = False
for _ in range(num_workers):
= threading.Thread(target=self._worker)
worker
worker.start()self.workers.append(worker)
def _worker(self):
while not self.shutdown:
try:
= self.task_queue.get(timeout=1)
task, args, kwargs if task is None:
break
*args, **kwargs)
task(self.task_queue.task_done()
except queue.Empty:
continue
def submit(self, task, *args, **kwargs):
self.task_queue.put((task, args, kwargs))
def close(self):
self.shutdown = True
for _ in self.workers:
self.task_queue.put((None, (), {}))
for worker in self.workers:
worker.join()
# Usage
def sample_task(name, delay):
print(f"Task {name} starting")
time.sleep(delay)print(f"Task {name} completed")
= SimpleThreadPool(3)
pool for i in range(5):
f"Task-{i}", 1)
pool.submit(sample_task,
6)
time.sleep( pool.close()
2. Async-style with Threading
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class AsyncResult:
def __init__(self, future):
self.future = future
def get(self, timeout=None):
return self.future.result(timeout=timeout)
def is_ready(self):
return self.future.done()
class AsyncExecutor:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def submit(self, func, *args, **kwargs):
= self.executor.submit(func, *args, **kwargs)
future return AsyncResult(future)
def map(self, func, iterable):
return [self.submit(func, item) for item in iterable]
def shutdown(self):
self.executor.shutdown(wait=True)
# Usage
def long_running_task(n):
time.sleep(n)return n * n
= AsyncExecutor(max_workers=3)
async_executor
# Submit tasks
= []
results for i in range(1, 4):
= async_executor.submit(long_running_task, i)
result
results.append(result)
# Wait for results
for i, result in enumerate(results):
print(f"Task {i+1} result: {result.get()}")
async_executor.shutdown()
3. Process Pool with Initialization
import multiprocessing
import time
# Global variable for each process
= None
process_data
def init_process(shared_data):
global process_data
= shared_data
process_data print(f"Process {multiprocessing.current_process().name} initialized")
def worker_with_init(item):
global process_data
# Use the initialized data
= item * process_data
result return result
if __name__ == "__main__":
= 10
shared_value
with multiprocessing.Pool(
=4,
processes=init_process,
initializer=(shared_value,)
initargsas pool:
) = [1, 2, 3, 4, 5]
items = pool.map(worker_with_init, items)
results print(f"Results: {results}")
Real-World Examples
1. Web Scraper
import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse
import queue
class WebScraper:
def __init__(self, max_workers=10):
self.max_workers = max_workers
self.session = requests.Session()
self.results = []
self.lock = threading.Lock()
def fetch_url(self, url):
try:
= self.session.get(url, timeout=10)
response
response.raise_for_status()return {
'url': url,
'status': response.status_code,
'content_length': len(response.content),
'title': self._extract_title(response.text)
}except Exception as e:
return {
'url': url,
'error': str(e)
}
def _extract_title(self, html):
# Simple title extraction
try:
= html.find('<title>') + 7
start = html.find('</title>', start)
end return html[start:end].strip()
except:
return "No title"
def scrape_urls(self, urls):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
= {executor.submit(self.fetch_url, url): url for url in urls}
future_to_url
for future in as_completed(future_to_url):
= future.result()
result with self.lock:
self.results.append(result)
return self.results
# Usage
if __name__ == "__main__":
= [
urls 'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404'
]
= WebScraper(max_workers=4)
scraper = scraper.scrape_urls(urls)
results
for result in results:
print(result)
2. File Processing Pipeline
import os
import threading
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import json
import time
class FileProcessor:
def __init__(self, input_dir, output_dir, max_workers=4):
self.input_dir = input_dir
self.output_dir = output_dir
self.max_workers = max_workers
self.processed_files = []
self.lock = threading.Lock()
def process_file(self, filepath):
"""Process a single file"""
try:
with open(filepath, 'r') as f:
= json.load(f)
data
# Simulate processing
= {
processed_data 'original_file': filepath,
'processed_at': time.time(),
'record_count': len(data) if isinstance(data, list) else 1,
'processing_time': 0.1
}
0.1) # Simulate processing time
time.sleep(
# Write processed file
= f"processed_{os.path.basename(filepath)}"
output_filename = os.path.join(self.output_dir, output_filename)
output_path
with open(output_path, 'w') as f:
=2)
json.dump(processed_data, f, indent
return {
'input': filepath,
'output': output_path,
'status': 'success'
}
except Exception as e:
return {
'input': filepath,
'error': str(e),
'status': 'failed'
}
def process_directory(self):
"""Process all JSON files in the input directory"""
= []
json_files for root, dirs, files in os.walk(self.input_dir):
for file in files:
if file.endswith('.json'):
file))
json_files.append(os.path.join(root,
print(f"Found {len(json_files)} JSON files to process")
# Process files in parallel
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
= list(executor.map(self.process_file, json_files))
results
return results
# Usage example
if __name__ == "__main__":
# Create sample data
'input_data', exist_ok=True)
os.makedirs('output_data', exist_ok=True)
os.makedirs(
# Create sample JSON files
for i in range(5):
= [{'id': j, 'value': j * 10} for j in range(100)]
sample_data with open(f'input_data/sample_{i}.json', 'w') as f:
json.dump(sample_data, f)
# Process files
= FileProcessor('input_data', 'output_data', max_workers=4)
processor = processor.process_directory()
results
# Print results
for result in results:
print(result)
3. Real-time Data Processing
import threading
import queue
import time
import random
import json
from datetime import datetime
class DataProcessor:
def __init__(self, num_workers=3):
self.input_queue = queue.Queue()
self.output_queue = queue.Queue()
self.num_workers = num_workers
self.workers = []
self.running = False
self.processed_count = 0
self.lock = threading.Lock()
def worker(self, worker_id):
"""Process data items from the queue"""
while self.running:
try:
= self.input_queue.get(timeout=1)
data if data is None:
break
# Simulate processing
= self.process_data(data, worker_id)
processed_data self.output_queue.put(processed_data)
with self.lock:
self.processed_count += 1
self.input_queue.task_done()
except queue.Empty:
continue
def process_data(self, data, worker_id):
"""Process individual data item"""
# Simulate processing time
0.1, 0.5))
time.sleep(random.uniform(
return {
'worker_id': worker_id,
'original_data': data,
'processed_at': datetime.now().isoformat(),
'result': data['value'] * 2 if 'value' in data else 'processed'
}
def start(self):
"""Start the worker threads"""
self.running = True
for i in range(self.num_workers):
= threading.Thread(target=self.worker, args=(i,))
worker
worker.start()self.workers.append(worker)
def stop(self):
"""Stop all worker threads"""
self.running = False
# Add sentinel values to wake up workers
for _ in range(self.num_workers):
self.input_queue.put(None)
# Wait for workers to finish
for worker in self.workers:
worker.join()
def add_data(self, data):
"""Add data to the processing queue"""
self.input_queue.put(data)
def get_result(self, timeout=None):
"""Get processed result"""
try:
return self.output_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_stats(self):
"""Get processing statistics"""
return {
'input_queue_size': self.input_queue.qsize(),
'output_queue_size': self.output_queue.qsize(),
'processed_count': self.processed_count,
'active_workers': len([w for w in self.workers if w.is_alive()])
}
# Usage example
if __name__ == "__main__":
= DataProcessor(num_workers=3)
processor
processor.start()
# Simulate data streaming
def data_generator():
for i in range(20):
yield {'id': i, 'value': random.randint(1, 100)}
0.1)
time.sleep(
# Add data to processor
for data in data_generator():
processor.add_data(data)print(f"Added data: {data}")
# Collect results
= []
results = time.time()
start_time while len(results) < 20 and time.time() - start_time < 30:
= processor.get_result(timeout=1)
result if result:
results.append(result)print(f"Got result: {result}")
# Print statistics
print(f"Final stats: {processor.get_stats()}")
## Troubleshooting Common Issues
### 1. Race Conditions
```pythonimport threading
import time
# Problem: Race condition
= 0
shared_counter
def unsafe_increment():
global shared_counter
for _ in range(100000):
+= 1 # This is not atomic!
shared_counter
# Solution: Use locks
= 0
safe_counter = threading.Lock()
counter_lock
def safe_increment():
global safe_counter
for _ in range(100000):
with counter_lock:
+= 1
safe_counter
# Alternative: Use atomic operations
from threading import Lock
import threading
class AtomicCounter:
def __init__(self):
self._value = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._value += 1
@property
def value(self):
with self._lock:
return self._value
# Usage
= AtomicCounter()
atomic_counter
def worker():
for _ in range(100000):
atomic_counter.increment()
= [threading.Thread(target=worker) for _ in range(5)]
threads for t in threads:
t.start()for t in threads:
t.join()
print(f"Atomic counter final value: {atomic_counter.value}")
2. Deadlocks
import threading
import time
# Problem: Deadlock scenario
= threading.Lock()
lock1 = threading.Lock()
lock2
def task1():
with lock1:
print("Task 1 acquired lock1")
0.1)
time.sleep(with lock2:
print("Task 1 acquired lock2")
def task2():
with lock2:
print("Task 2 acquired lock2")
0.1)
time.sleep(with lock1:
print("Task 2 acquired lock1")
# Solution: Always acquire locks in the same order
def safe_task1():
with lock1:
print("Safe Task 1 acquired lock1")
0.1)
time.sleep(with lock2:
print("Safe Task 1 acquired lock2")
def safe_task2():
with lock1: # Same order as safe_task1
print("Safe Task 2 acquired lock1")
0.1)
time.sleep(with lock2:
print("Safe Task 2 acquired lock2")
# Alternative: Use timeout
import threading
def task_with_timeout():
if lock1.acquire(timeout=1):
try:
print("Acquired lock1")
if lock2.acquire(timeout=1):
try:
print("Acquired lock2")
# Do work
finally:
lock2.release()else:
print("Could not acquire lock2")
finally:
lock1.release()else:
print("Could not acquire lock1")
3. Memory Leaks in Multiprocessing
import multiprocessing
import psutil
import os
# Problem: Not properly cleaning up processes
def memory_leak_example():
= []
processes for i in range(10):
= multiprocessing.Process(target=lambda: time.sleep(10))
p
p.start()
processes.append(p)# Forgetting to join processes can lead to zombie processes
# Solution: Proper cleanup
def proper_process_management():
= []
processes try:
for i in range(10):
= multiprocessing.Process(target=lambda: time.sleep(1))
p
p.start()
processes.append(p)
# Wait for all processes to complete
for p in processes:
p.join()
except KeyboardInterrupt:
print("Interrupting processes...")
for p in processes:
p.terminate()for p in processes:
p.join()
# Context manager approach
from contextlib import contextmanager
@contextmanager
def managed_processes(target_func, num_processes):
= []
processes try:
for i in range(num_processes):
= multiprocessing.Process(target=target_func)
p
p.start()
processes.append(p)yield processes
finally:
for p in processes:
if p.is_alive():
p.terminate()for p in processes:
p.join()
# Usage
def worker_task():
1)
time.sleep(print(f"Worker {os.getpid()} finished")
if __name__ == "__main__":
with managed_processes(worker_task, 4) as processes:
print(f"Started {len(processes)} processes")
# Processes will be properly cleaned up
4. Pickle Errors in Multiprocessing
import multiprocessing
import pickle
# Problem: Cannot pickle certain objects
class UnpicklableClass:
def __init__(self):
self.lambda_func = lambda x: x * 2 # Cannot pickle lambda
self.file_handle = open('temp.txt', 'w') # Cannot pickle file handles
# Solution: Use picklable alternatives
class PicklableClass:
def __init__(self):
self.multiplier = 2
def multiply(self, x):
return x * self.multiplier
def process_with_method(obj, value):
return obj.multiply(value)
# Alternative: Use dill for advanced pickling
try:
import dill
def advanced_pickle_function():
= lambda x: x * 2
func return dill.dumps(func)
except ImportError:
print("dill not available")
# Using multiprocessing with proper pickling
def safe_multiprocessing_example():
if __name__ == "__main__":
= PicklableClass()
obj = [1, 2, 3, 4, 5]
values
with multiprocessing.Pool(processes=4) as pool:
= pool.starmap(process_with_method, [(obj, v) for v in values])
results
print(f"Results: {results}")
5. Exception Handling in Concurrent Code
import threading
import multiprocessing
import logging
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
# Setup logging
=logging.INFO)
logging.basicConfig(level
def risky_task(task_id):
import random
if random.random() < 0.3: # 30% chance of failure
raise ValueError(f"Task {task_id} failed")
return f"Task {task_id} completed"
# Thread exception handling
def handle_thread_exceptions():
= []
results = []
errors
with ThreadPoolExecutor(max_workers=4) as executor:
= [executor.submit(risky_task, i) for i in range(10)]
futures
for future in as_completed(futures):
try:
= future.result()
result
results.append(result)except Exception as e:
str(e))
errors.append(f"Task failed: {e}")
logging.error(
print(f"Completed: {len(results)}, Failed: {len(errors)}")
return results, errors
# Process exception handling
def handle_process_exceptions():
= []
results = []
errors
with ProcessPoolExecutor(max_workers=4) as executor:
= [executor.submit(risky_task, i) for i in range(10)]
futures
for future in as_completed(futures):
try:
= future.result()
result
results.append(result)except Exception as e:
str(e))
errors.append(f"Process task failed: {e}")
logging.error(
print(f"Completed: {len(results)}, Failed: {len(errors)}")
return results, errors
# Custom exception handler
class ExceptionHandler:
def __init__(self):
self.exceptions = []
self.lock = threading.Lock()
def handle_exception(self, exception):
with self.lock:
self.exceptions.append(exception)
f"Exception caught: {exception}")
logging.error(
def task_with_exception_handler(task_id, exception_handler):
try:
return risky_task(task_id)
except Exception as e:
exception_handler.handle_exception(e)return None
# Usage
if __name__ == "__main__":
print("Thread exception handling:")
handle_thread_exceptions()
print("\nProcess exception handling:")
handle_process_exceptions()
6. Performance Monitoring
import time
import threading
import multiprocessing
import psutil
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class PerformanceMonitor:
def __init__(self):
self.start_time = None
self.end_time = None
self.cpu_percent = []
self.memory_percent = []
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self):
self.start_time = time.time()
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor)
self.monitor_thread.start()
def stop_monitoring(self):
self.end_time = time.time()
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor(self):
while self.monitoring:
self.cpu_percent.append(psutil.cpu_percent())
self.memory_percent.append(psutil.virtual_memory().percent)
0.1)
time.sleep(
def get_stats(self):
= self.end_time - self.start_time if self.end_time else 0
duration return {
'duration': duration,
'avg_cpu': sum(self.cpu_percent) / len(self.cpu_percent) if self.cpu_percent else 0,
'max_cpu': max(self.cpu_percent) if self.cpu_percent else 0,
'avg_memory': sum(self.memory_percent) / len(self.memory_percent) if self.memory_percent else 0,
'max_memory': max(self.memory_percent) if self.memory_percent else 0
}
def cpu_intensive_task(n):
= 0
total for i in range(n * 100000):
+= i
total return total
def benchmark_approaches():
= [1000] * 8
tasks
# Sequential
= PerformanceMonitor()
monitor
monitor.start_monitoring()= [cpu_intensive_task(n) for n in tasks]
sequential_results
monitor.stop_monitoring()= monitor.get_stats()
sequential_stats
# Threading
= PerformanceMonitor()
monitor
monitor.start_monitoring()with ThreadPoolExecutor(max_workers=4) as executor:
= list(executor.map(cpu_intensive_task, tasks))
thread_results
monitor.stop_monitoring()= monitor.get_stats()
thread_stats
# Multiprocessing
= PerformanceMonitor()
monitor
monitor.start_monitoring()with ProcessPoolExecutor(max_workers=4) as executor:
= list(executor.map(cpu_intensive_task, tasks))
process_results
monitor.stop_monitoring()= monitor.get_stats()
process_stats
print("Performance Comparison:")
print(f"Sequential - Duration: {sequential_stats['duration']:.2f}s, "
f"Avg CPU: {sequential_stats['avg_cpu']:.1f}%, "
f"Max CPU: {sequential_stats['max_cpu']:.1f}%")
print(f"Threading - Duration: {thread_stats['duration']:.2f}s, "
f"Avg CPU: {thread_stats['avg_cpu']:.1f}%, "
f"Max CPU: {thread_stats['max_cpu']:.1f}%")
print(f"Multiprocessing - Duration: {process_stats['duration']:.2f}s, "
f"Avg CPU: {process_stats['avg_cpu']:.1f}%, "
f"Max CPU: {process_stats['max_cpu']:.1f}%")
if __name__ == "__main__":
benchmark_approaches()
Key Takeaways
When to Use Threading
- I/O-bound operations (file reading, network requests, database queries)
- Tasks that spend time waiting for external resources
- When you need shared memory access
- Lighter weight than processes
When to Use Multiprocessing
- CPU-intensive computations
- Tasks that can be parallelized independently
- When you need to bypass the GIL
- When process isolation is important for stability
General Best Practices
- Always use context managers (
with
statements) for resource management - Handle exceptions properly in concurrent code
- Use appropriate synchronization primitives to avoid race conditions
- Monitor performance to ensure concurrency is actually helping
- Consider using
concurrent.futures
for simpler concurrent programming - Be mindful of the overhead of creating threads/processes
- Test concurrent code thoroughly as bugs can be hard to reproduce
Common Pitfalls to Avoid
- Race conditions due to shared state
- Deadlocks from improper lock ordering
- Memory leaks from not properly cleaning up processes
- Pickle errors when passing objects between processes
- Not handling exceptions in concurrent tasks
- Creating too many threads/processes (use pools instead)
This guide provides a solid foundation for understanding and implementing concurrent programming in Python. Remember that the choice between threading and multiprocessing depends on your specific use case, and sometimes a hybrid approach or alternative solutions like asyncio might be more appropriate.