Workers
Build robust background task systems with SPOC's worker framework. This guide covers everything from basic worker implementation to advanced concurrency patterns.
What Are Workers?
Workers are background task executors that run concurrently with your main application. They provide a consistent interface for running tasks in threads or processes with unified lifecycle management, error handling, and signal management.
Key Features:
- Unified API for thread and process-based concurrency
- Lifecycle management (setup, main, teardown)
- Built-in error handling and recovery
- Signal handling for graceful shutdowns
- Async/await support
- Multi-worker orchestration via Server
When to Use Workers
Workers are ideal for several scenarios:
Background Tasks
Long-running operations that shouldn't block your main application:
- Message queue processing: Process jobs from Redis, RabbitMQ, or other queues
- Scheduled tasks: Run periodic cleanup, maintenance, or data synchronization
- Event monitoring: Watch file systems, databases, or external APIs for changes
Concurrent Operations
Tasks that benefit from parallel execution:
- Data processing: Transform large datasets using multiple cores
- API request handling: Service multiple HTTP requests simultaneously
- Real-time data ingestion: Handle streams from multiple sources
Resource Isolation
Operations that need separate execution contexts:
- Memory-intensive tasks: Prevent memory leaks from affecting the main process
- Untrusted code execution: Isolate potentially problematic code in separate processes
- State management: Maintain independent state for different subsystems
ThreadWorker vs ProcessWorker
SPOC provides two worker types optimized for different workloads.
ThreadWorker
Runs tasks in a separate thread within the same process.
Best for:
- I/O-bound tasks (network requests, file operations, database queries)
- Tasks requiring shared memory with the main process
- Lightweight concurrency with low overhead
Limitations:
- Subject to Python's Global Interpreter Lock (GIL)
- CPU-bound tasks won't achieve true parallelism
- Shared memory can lead to race conditions without proper synchronization
Example - I/O Bound Task:
from spoc.workers import ThreadWorker
import time
class LogWatcher(ThreadWorker):
"""Watch a log file for new entries."""
def setup(self):
"""Initialize the log file handle."""
self.log_file = open("/var/log/app.log", "r")
self.log_file.seek(0, 2) # Move to end of file
def main(self):
"""Watch for new log lines."""
while self.is_running:
line = self.log_file.readline()
if line:
self.process_log_line(line)
else:
time.sleep(0.1) # Wait for new content
def teardown(self):
"""Close the log file."""
self.log_file.close()
def process_log_line(self, line):
"""Process a single log line."""
if "ERROR" in line:
print(f"Error detected: {line.strip()}")
ProcessWorker
Runs tasks in a separate process with independent memory.
Best for:
- CPU-bound tasks (data processing, calculations, compression)
- Tasks that need to bypass the GIL for true parallelism
- Operations requiring complete memory isolation
Trade-offs:
- Higher overhead (separate Python interpreter)
- No shared memory (requires inter-process communication)
- More resource intensive
Example - CPU Bound Task:
from spoc.workers import ProcessWorker
import hashlib
class HashCalculator(ProcessWorker):
"""Calculate hashes for files in a queue."""
def setup(self):
"""Initialize worker state."""
self.processed_count = 0
def main(self):
"""Process files from queue."""
while self.is_running:
# Simulate getting file from queue
file_path = self.get_next_file()
if file_path:
self.calculate_hash(file_path)
self.processed_count += 1
def calculate_hash(self, file_path):
"""Calculate SHA-256 hash of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
def get_next_file(self):
"""Get next file from queue (placeholder)."""
# In real implementation, get from shared queue
return None
Quick Reference
| Feature | ThreadWorker | ProcessWorker |
|---|---|---|
| Concurrency | Threading | Multiprocessing |
| Memory | Shared | Isolated |
| GIL Impact | Yes | No |
| Overhead | Low | High |
| Best For | I/O-bound | CPU-bound |
| Startup Time | Fast | Slower |
| Communication | Direct | IPC required |
Creating Custom Workers
All workers inherit from AbstractWorker and implement three lifecycle methods.
Basic Worker Structure
from spoc.workers import ThreadWorker
class MyWorker(ThreadWorker):
"""Custom worker implementation."""
def setup(self):
"""
Called once before main() starts.
Use for:
- Opening file handles or database connections
- Initializing state variables
- Setting up resources
"""
self.context.connection = self.create_connection()
self.counter = 0
def main(self):
"""
Main worker loop.
Runs until self.is_running becomes False.
Should check is_running regularly to allow graceful shutdown.
"""
while self.is_running:
# Do work here
self.counter += 1
time.sleep(1)
def teardown(self):
"""
Called once after main() completes.
Use for:
- Closing file handles or database connections
- Saving state
- Cleanup operations
"""
self.context.connection.close()
print(f"Processed {self.counter} items")
The Worker Lifecycle
sequenceDiagram
participant User
participant Worker
participant Lifecycle
User->>Worker: start()
Worker->>Lifecycle: emit "startup" event
Worker->>Worker: setup()
Worker->>Worker: main()
Note over Worker: Worker runs...
User->>Worker: stop()
Worker->>Worker: Exit main()
Worker->>Worker: teardown()
Worker->>Lifecycle: emit "shutdown" event
Lifecycle Events
Workers emit lifecycle events that you can handle:
from spoc.workers import ThreadWorker
class MonitoredWorker(ThreadWorker):
"""Worker with lifecycle event handling."""
def lifecycle(self, event_type, **data):
"""
Handle lifecycle events.
Args:
event_type: Type of event ("startup", "shutdown", "error")
**data: Additional event data (e.g., exception for errors)
"""
match event_type:
case "startup":
print(f"{self.name} is starting up")
self.context.start_time = time.time()
case "shutdown":
elapsed = time.time() - self.context.start_time
print(f"{self.name} ran for {elapsed:.2f} seconds")
case "error":
exception = data.get("exception")
print(f"{self.name} error: {exception}")
# Log to monitoring system, send alerts, etc.
def main(self):
"""Worker main loop."""
while self.is_running:
# Do work
time.sleep(1)
Available Events:
"startup"- Worker has started, setup() completed"shutdown"- Worker is shutting down, teardown() completed"error"- An exception occurred during execution (includesexceptionin data)
The Server Class
The Server manages multiple workers as a cohesive unit, handling startup, shutdown, and signal management.
Basic Server Usage
from spoc.workers import Server, ThreadWorker, ProcessWorker
class Worker1(ThreadWorker):
def main(self):
while self.is_running:
print(f"{self.name} working...")
time.sleep(1)
class Worker2(ProcessWorker):
def main(self):
while self.is_running:
print(f"{self.name} processing...")
time.sleep(1)
# Create server
server = Server(name="MyAppServer")
# Add workers
server.add_worker(Worker1(name="ThreadWorker-1"))
server.add_worker(Worker2(name="ProcessWorker-1"))
# Start all workers and run until interrupted
server.run_forever()
Server Methods
# Create a server
server = Server(name="MyServer")
# Add workers
server.add_worker(worker1)
server.add_worker(worker2)
# Start all workers
server.start()
# Stop all workers
server.stop()
# Wait for all workers to finish
server.join_all(timeout=5.0)
# Run until interrupted (combines start, wait, stop, join)
server.run_forever()
run_forever() and Signal Handling
The run_forever() method provides automatic signal handling for graceful shutdowns.
from spoc.workers import Server, ThreadWorker
import signal
class GracefulWorker(ThreadWorker):
def setup(self):
"""Initialize state."""
self.items_processed = 0
def main(self):
"""Process items until stopped."""
while self.is_running:
# Simulate processing
self.items_processed += 1
time.sleep(0.5)
def teardown(self):
"""Save state before shutdown."""
print(f"Processed {self.items_processed} items before shutdown")
# Create and configure server
server = Server(name="GracefulServer")
server.add_worker(GracefulWorker(name="Worker-1"))
# Run until SIGINT (Ctrl+C) or SIGTERM
server.run_forever()
How Signal Handling Works:
- Server registers handlers for
SIGINTandSIGTERM - When signal received, server calls
stop()on all workers - Workers exit their main loops gracefully
- Teardown methods execute for cleanup
- Server waits for all workers to finish (with timeout)
Handled Signals:
SIGINT- Interrupt from keyboard (Ctrl+C)SIGTERM- Termination request from system/container orchestrator
Async Support
Workers fully support async/await for modern asynchronous programming.
Async Worker Example
from spoc.workers import ThreadWorker
import asyncio
class AsyncAPIWorker(ThreadWorker):
"""Worker that makes async HTTP requests."""
async def setup(self):
"""Async setup method."""
# Create async HTTP client
self.session = await self.create_session()
async def main(self):
"""Async main loop."""
while self.is_running:
# Fetch data from API
data = await self.fetch_data()
await self.process_data(data)
await asyncio.sleep(5)
async def teardown(self):
"""Async cleanup."""
await self.session.close()
async def fetch_data(self):
"""Fetch data from API."""
# Simulated async HTTP request
await asyncio.sleep(0.1)
return {"status": "ok"}
async def process_data(self, data):
"""Process fetched data."""
print(f"Processed: {data}")
async def create_session(self):
"""Create HTTP session."""
# In real code, use aiohttp or similar
return object()
async def lifecycle(self, event_type, **data):
"""Async lifecycle handler."""
print(f"Async event: {event_type}")
run_async_safely()
The run_async_safely() function runs coroutines correctly whether an event loop exists or not.
from spoc.workers import run_async_safely
import asyncio
async def my_coroutine():
"""Example async function."""
await asyncio.sleep(1)
return "Done"
# Safely run coroutine (creates event loop if needed)
run_async_safely(my_coroutine())
Behavior:
- If event loop is running: Creates task in existing loop
- If no event loop: Creates new loop and runs coroutine with
asyncio.run() - Automatically uses uvloop if available for better performance
uvloop Integration
SPOC automatically uses uvloop for improved async performance when available.
Enable uvloop
from spoc.workers import set_uvloop_if_available
# Enable uvloop for all async operations
set_uvloop_if_available()
# Now create async workers - they'll use uvloop automatically
worker = AsyncAPIWorker(name="FastWorker")
worker.start()
Benefits:
- 2-4x faster than default asyncio event loop
- Better performance for I/O-bound async workloads
- Drop-in replacement (no code changes needed)
Installation:
Worker Patterns and Best Practices
Pattern 1: Background Job Processor
Process jobs from a queue in the background.
from spoc.workers import ThreadWorker
from queue import Queue
import time
class JobProcessor(ThreadWorker):
"""Process jobs from a shared queue."""
def __init__(self, name, job_queue):
super().__init__(name)
self.job_queue = job_queue
def main(self):
"""Process jobs until stopped."""
while self.is_running:
try:
# Get job with timeout to allow checking is_running
job = self.job_queue.get(timeout=1.0)
self.process_job(job)
self.job_queue.task_done()
except Exception:
# Queue empty, continue
continue
def process_job(self, job):
"""Process a single job."""
print(f"{self.name} processing: {job}")
time.sleep(0.5) # Simulate work
# Create shared queue
job_queue = Queue()
# Create server with multiple workers
server = Server(name="JobServer")
server.add_worker(JobProcessor("Worker-1", job_queue))
server.add_worker(JobProcessor("Worker-2", job_queue))
# Add jobs
for i in range(10):
job_queue.put(f"Job-{i}")
# Process jobs
server.start()
job_queue.join() # Wait for all jobs to complete
server.stop()
server.join_all()
Pattern 2: Periodic Tasks
Run tasks on a schedule.
from spoc.workers import ThreadWorker
import time
from datetime import datetime
class PeriodicWorker(ThreadWorker):
"""Execute tasks on a schedule."""
def __init__(self, name, interval_seconds):
super().__init__(name)
self.interval = interval_seconds
def main(self):
"""Run task periodically."""
while self.is_running:
start_time = time.time()
# Execute the task
self.execute_task()
# Sleep for remaining interval
elapsed = time.time() - start_time
sleep_time = max(0, self.interval - elapsed)
# Sleep in small chunks to check is_running
while sleep_time > 0 and self.is_running:
time.sleep(min(1.0, sleep_time))
sleep_time -= 1.0
def execute_task(self):
"""Override this method with your task logic."""
print(f"[{datetime.now()}] Task executed")
class DataSyncWorker(PeriodicWorker):
"""Sync data every 5 minutes."""
def __init__(self, name):
super().__init__(name, interval_seconds=300)
def execute_task(self):
"""Sync data from remote source."""
print(f"Syncing data at {datetime.now()}")
# Perform sync logic here
# Run periodic worker
worker = DataSyncWorker(name="DataSync")
server = Server()
server.add_worker(worker)
server.run_forever()
Pattern 3: Multi-Stage Pipeline
Process data through multiple stages with different workers.
from spoc.workers import ThreadWorker, ProcessWorker
from queue import Queue
class StageWorker(ThreadWorker):
"""Base class for pipeline stage workers."""
def __init__(self, name, input_queue, output_queue):
super().__init__(name)
self.input_queue = input_queue
self.output_queue = output_queue
def main(self):
"""Process items from input to output."""
while self.is_running:
try:
item = self.input_queue.get(timeout=1.0)
result = self.process_item(item)
if result is not None:
self.output_queue.put(result)
self.input_queue.task_done()
except Exception:
continue
def process_item(self, item):
"""Override this method for stage-specific processing."""
raise NotImplementedError
class FetchStage(StageWorker):
"""Stage 1: Fetch raw data."""
def process_item(self, item):
"""Fetch data."""
return {"id": item, "raw_data": f"data-{item}"}
class TransformStage(ProcessWorker):
"""Stage 2: Transform data (CPU-intensive)."""
def __init__(self, name, input_queue, output_queue):
super().__init__(name)
self.input_queue = input_queue
self.output_queue = output_queue
def main(self):
"""Transform items."""
while self.is_running:
try:
item = self.input_queue.get(timeout=1.0)
result = self.transform(item)
self.output_queue.put(result)
self.input_queue.task_done()
except Exception:
continue
def transform(self, item):
"""CPU-intensive transformation."""
# Simulate heavy processing
return {**item, "transformed": True}
class SaveStage(StageWorker):
"""Stage 3: Save results."""
def process_item(self, item):
"""Save to database."""
print(f"Saved: {item}")
return None # End of pipeline
# Create pipeline
fetch_to_transform = Queue()
transform_to_save = Queue()
input_queue = Queue()
server = Server(name="Pipeline")
server.add_worker(FetchStage("Fetch", input_queue, fetch_to_transform))
server.add_worker(TransformStage("Transform", fetch_to_transform, transform_to_save))
server.add_worker(SaveStage("Save", transform_to_save, Queue()))
# Feed pipeline
for i in range(10):
input_queue.put(i)
# Run pipeline
server.run_forever()
Pattern 4: Health Monitoring
Monitor worker health and restart on failure.
from spoc.workers import ThreadWorker
import time
class HealthMonitor(ThreadWorker):
"""Monitor and restart failed workers."""
def __init__(self, name, workers_to_monitor):
super().__init__(name)
self.workers = workers_to_monitor
def main(self):
"""Check worker health periodically."""
while self.is_running:
for worker in self.workers:
if not worker._thread_or_process.is_alive():
print(f"Worker {worker.name} died, restarting...")
self.restart_worker(worker)
time.sleep(5) # Check every 5 seconds
def restart_worker(self, worker):
"""Restart a failed worker."""
# Create new instance of same worker type
new_worker = type(worker)(worker.name)
new_worker.start()
# Replace in monitoring list
idx = self.workers.index(worker)
self.workers[idx] = new_worker
def lifecycle(self, event_type, **data):
"""Handle lifecycle events."""
if event_type == "error":
print(f"Health monitor detected error: {data.get('exception')}")
Best Practices
1. Always Check is_running
Check self.is_running regularly in your main loop to allow graceful shutdown:
def main(self):
"""Good: Checks is_running regularly."""
while self.is_running:
self.do_work()
time.sleep(1)
def main(self):
"""Bad: No shutdown check."""
while True: # Will never stop gracefully
self.do_work()
time.sleep(1)
2. Use Appropriate Worker Type
Choose based on your workload:
# I/O-bound: Use ThreadWorker
class APIClient(ThreadWorker):
def main(self):
while self.is_running:
response = requests.get("https://api.example.com")
self.process(response)
# CPU-bound: Use ProcessWorker
class DataCruncher(ProcessWorker):
def main(self):
while self.is_running:
data = self.get_data()
result = self.heavy_computation(data)
3. Clean Up Resources
Always clean up in teardown():
def setup(self):
"""Acquire resources."""
self.db_conn = connect_to_database()
self.file_handle = open("data.txt", "w")
def teardown(self):
"""Release resources."""
if hasattr(self, 'db_conn'):
self.db_conn.close()
if hasattr(self, 'file_handle'):
self.file_handle.close()
4. Handle Errors Gracefully
Catch and handle exceptions in your worker code:
def main(self):
"""Handle errors without crashing."""
while self.is_running:
try:
self.do_risky_operation()
except ConnectionError as e:
print(f"Connection failed: {e}, retrying...")
time.sleep(5)
except Exception as e:
# Let unexpected errors propagate to lifecycle handler
raise
5. Use Lifecycle Events for Monitoring
Implement lifecycle events for observability:
def lifecycle(self, event_type, **data):
"""Send lifecycle events to monitoring system."""
match event_type:
case "startup":
metrics.increment("worker.started")
case "shutdown":
metrics.increment("worker.stopped")
case "error":
metrics.increment("worker.errors")
logger.error(f"Worker error: {data['exception']}")
6. Use Context for Shared State
Use self.context for sharing state between methods:
def setup(self):
"""Initialize shared context."""
self.context.processed_count = 0
self.context.start_time = time.time()
def main(self):
"""Access shared context."""
while self.is_running:
self.do_work()
self.context.processed_count += 1
def teardown(self):
"""Report using shared context."""
elapsed = time.time() - self.context.start_time
rate = self.context.processed_count / elapsed
print(f"Processed {self.context.processed_count} items at {rate:.2f}/sec")
7. Test Worker Logic Separately
Extract business logic for easier testing:
class DataWorker(ThreadWorker):
"""Worker with testable logic."""
def main(self):
"""Worker loop."""
while self.is_running:
data = self.fetch_data()
result = self.process_data(data) # Testable
self.save_result(result)
@staticmethod
def process_data(data):
"""Pure function - easy to test."""
return data.upper()
# Test the logic
def test_process_data():
assert DataWorker.process_data("hello") == "HELLO"
Common Pitfalls
Pitfall 1: Blocking Operations Without Timeout
# Bad: Will block forever, can't shut down gracefully
def main(self):
while self.is_running:
item = self.queue.get() # Blocks forever if queue empty
self.process(item)
# Good: Use timeout to check is_running
def main(self):
while self.is_running:
try:
item = self.queue.get(timeout=1.0)
self.process(item)
except Empty:
continue
Pitfall 2: Shared Mutable State (ThreadWorker)
# Bad: Race condition with shared state
class CounterWorker(ThreadWorker):
shared_counter = 0 # Class variable - shared!
def main(self):
while self.is_running:
self.shared_counter += 1 # Race condition!
# Good: Use instance variables or locks
class SafeCounterWorker(ThreadWorker):
def setup(self):
self.counter = 0 # Instance variable
def main(self):
while self.is_running:
self.counter += 1 # Safe
Pitfall 3: Memory Leaks in Long-Running Workers
# Bad: Unbounded memory growth
def main(self):
results = []
while self.is_running:
result = self.compute()
results.append(result) # List grows forever!
# Good: Process and discard
def main(self):
while self.is_running:
result = self.compute()
self.save_result(result) # Process immediately
Summary
You now understand:
- When to use workers for background tasks and concurrency
- ThreadWorker vs ProcessWorker trade-offs and use cases
- Worker lifecycle (setup, main, teardown)
- The Server class for managing multiple workers
- Signal handling with run_forever()
- Async/await support with run_async_safely()
- uvloop integration for performance
- Common patterns for job processing, periodic tasks, and pipelines
- Best practices for robust worker implementation
Workers provide a solid foundation for building concurrent, fault-tolerant systems. Use them to offload background work, process data in parallel, and build responsive applications.
Next Steps
- Study Examples: Review
examples/workers.pyfor working code - Framework Integration: Combine workers with SPOC Framework for complete applications
- Advanced Patterns: Explore worker pools, retry logic, and circuit breakers
- API Reference: See Framework API for complete documentation