Skip to content

sricursion/workqueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

WorkQueue

A small background-task processing system in Python. Producer accepts tasks over HTTP, worker pulls them from Redis and runs them. Three worker threads, retries, JSON tasks, and a relative-path log file.

Architecture

                  POST /enqueue
   HTTP client  ───────────────►  producer.py  ──RPUSH──►  Redis ("task_queue")
                                                                │
                                                                │ BLPOP
                                                                ▼
                                                         worker.py
                                                       (3 threads ─►  dispatch on `type`)
                                                                │
                                                                ▼
                                                           logs.txt

   GET /metrics  ─────────────►  worker.py

Setup

pip install -r requirements.txt
cp .env.example .env       # adjust REDIS_URL / ports if needed

You need a Redis instance reachable at the REDIS_URL you put in .env.

In two terminals:

python producer.py         # listens on $PORT_PRODUCER (default 8080)
python worker.py           # listens on $PORT_WORKER  (default 8081), starts 3 worker threads

Producer — POST /enqueue

curl -X POST http://localhost:8080/enqueue \
     -H "Content-Type: application/json" \
     -d '{"type": "send_email",
          "retries": 3,
          "payload": {"to": "alice@example.com", "subject": "Welcome"}}'

Fields:

  • type — required. Selects the handler in the worker.
  • retries — number of retries after the initial attempt. retries: 2 means up to 3 attempts in total.
  • payload — arbitrary key/value object passed to the handler.

Validation:

  • type must be present and non-empty.
  • For type: "send_email", the payload must contain both to and subject.

Returns 200 on success, 400 on validation failure, 500 if Redis is unreachable.

Worker — GET /metrics

{
  "total_jobs_in_queue": 0,
  "jobs_done": 12,
  "jobs_failed": 3
}

Three daemon threads sit on BLPOP task_queue, parse the JSON, and dispatch by type. If the handler raises, the task is requeued with retries decremented; once retries hits zero it's dropped. Counters are mutated under a lock so the metrics are accurate under concurrency.

Built-in task types

type What the handler does
send_email sleeps 2s, then prints payload.to and payload.subject
resize_image prints payload.new_x and payload.new_y
generate_pdf prints Generating pdf...

Adding a new task type

In worker.py, add a branch to process_task:

elif task_type == "your_task":
    # do the work here; raise any Exception to trigger the retry path
    ...

That's the only change required — the producer accepts any type (validation beyond type presence only applies to send_email).

Logging

Every processed task — success or failure — is appended to logs.txt in the worker's working directory.

SUCCESS: Task type: <type> Task payload: {...} Retries left: <n>
FAILURE: Task type: <type> Task payload: {...} Retries left: <n> Error message: <err>

See sample-logs.txt for real output captured from a run that mixed successful tasks with two failing ones (an unsupported type with retries: 2, and an empty type with retries: 1).

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages