A small background-task processing system in Python. Producer accepts tasks over HTTP, worker pulls them from Redis and runs them. Three worker threads, retries, JSON tasks, and a relative-path log file.
POST /enqueue
HTTP client ───────────────► producer.py ──RPUSH──► Redis ("task_queue")
│
│ BLPOP
▼
worker.py
(3 threads ─► dispatch on `type`)
│
▼
logs.txt
GET /metrics ─────────────► worker.py
pip install -r requirements.txt
cp .env.example .env # adjust REDIS_URL / ports if neededYou need a Redis instance reachable at the REDIS_URL you put in .env.
In two terminals:
python producer.py # listens on $PORT_PRODUCER (default 8080)
python worker.py # listens on $PORT_WORKER (default 8081), starts 3 worker threadscurl -X POST http://localhost:8080/enqueue \
-H "Content-Type: application/json" \
-d '{"type": "send_email",
"retries": 3,
"payload": {"to": "alice@example.com", "subject": "Welcome"}}'Fields:
- type — required. Selects the handler in the worker.
- retries — number of retries after the initial attempt.
retries: 2means up to 3 attempts in total. - payload — arbitrary key/value object passed to the handler.
Validation:
typemust be present and non-empty.- For
type: "send_email", the payload must contain bothtoandsubject.
Returns 200 on success, 400 on validation failure, 500 if Redis is
unreachable.
{
"total_jobs_in_queue": 0,
"jobs_done": 12,
"jobs_failed": 3
}Three daemon threads sit on BLPOP task_queue, parse the JSON, and dispatch
by type. If the handler raises, the task is requeued with retries
decremented; once retries hits zero it's dropped. Counters are mutated
under a lock so the metrics are accurate under concurrency.
type |
What the handler does |
|---|---|
send_email |
sleeps 2s, then prints payload.to and payload.subject |
resize_image |
prints payload.new_x and payload.new_y |
generate_pdf |
prints Generating pdf... |
In worker.py, add a branch to process_task:
elif task_type == "your_task":
# do the work here; raise any Exception to trigger the retry path
...That's the only change required — the producer accepts any type (validation
beyond type presence only applies to send_email).
Every processed task — success or failure — is appended to logs.txt in the
worker's working directory.
SUCCESS: Task type: <type> Task payload: {...} Retries left: <n>
FAILURE: Task type: <type> Task payload: {...} Retries left: <n> Error message: <err>
See sample-logs.txt for real output captured from a run
that mixed successful tasks with two failing ones (an unsupported type
with retries: 2, and an empty type with retries: 1).