-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
141 lines (112 loc) · 4.35 KB
/
Copy pathworker.py
File metadata and controls
141 lines (112 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import json
import logging
import os
import threading
import time
from dotenv import load_dotenv
from flask import Flask, jsonify, request
import redis
load_dotenv()
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
PORT_WORKER = int(os.getenv("PORT_WORKER", "8081"))
WORKER_COUNT = int(os.getenv("WORKER_COUNT", "3"))
QUEUE_NAME = "task_queue"
LOG_FILE = "logs.txt"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("worker")
rdb = redis.Redis.from_url(REDIS_URL, decode_responses=True)
metrics_lock = threading.Lock()
log_file_lock = threading.Lock()
jobs_done = 0
jobs_failed = 0
def _append_log(line: str) -> None:
with log_file_lock:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
def log_success(task: dict) -> None:
payload = json.dumps(task.get("payload", {}))
_append_log(f"SUCCESS: Task type: {task.get('type')} Task payload: {payload} "
f"Retries left: {task.get('retries', 0)}")
def log_failure(task: dict, err: Exception) -> None:
payload = json.dumps(task.get("payload", {}))
_append_log(f"FAILURE: Task type: {task.get('type')} Task payload: {payload} "
f"Retries left: {task.get('retries', 0)} Error message: {err}")
def process_task(task: dict) -> None:
payload = task.get("payload")
if payload is None:
raise ValueError("payload is empty")
task_type = task.get("type", "")
if task_type == "send_email":
time.sleep(2)
log.info("Sending email to %s with subject %s", payload.get("to"), payload.get("subject"))
elif task_type == "resize_image":
log.info("Resizing image to x: %s y: %s", payload.get("new_x"), payload.get("new_y"))
elif task_type == "generate_pdf":
log.info("Generating pdf...")
elif task_type == "":
raise ValueError("task type is empty")
else:
raise ValueError(f"unsupported task: {task_type}")
def run_worker(worker_id: int) -> None:
global jobs_done, jobs_failed
log.info("Worker %s started", worker_id)
while True:
try:
popped = rdb.blpop(QUEUE_NAME, timeout=0)
except redis.RedisError as exc:
log.error("Worker %s: Redis BLPOP failed: %s. Backing off.", worker_id, exc)
time.sleep(1)
continue
if popped is None:
continue
_, raw = popped
try:
task = json.loads(raw)
except json.JSONDecodeError as exc:
log.error("Worker %s: dropping malformed task (%s): %s", worker_id, exc, raw)
with metrics_lock:
jobs_failed += 1
continue
try:
process_task(task)
except Exception as exc:
with metrics_lock:
jobs_failed += 1
log_failure(task, exc)
retries = int(task.get("retries", 0))
if retries > 0:
task["retries"] = retries - 1
log.warning("Worker %s: task failed (%s); requeueing with %s retries left",
worker_id, exc, task["retries"])
try:
rdb.rpush(QUEUE_NAME, json.dumps(task))
except redis.RedisError as push_exc:
log.error("Worker %s: failed to requeue task: %s", worker_id, push_exc)
else:
log.error("Worker %s: task failed after all retries (%s)", worker_id, exc)
continue
with metrics_lock:
jobs_done += 1
log_success(task)
log.info("Worker %s: task done successfully", worker_id)
app = Flask(__name__)
@app.route("/metrics", methods=["GET"])
def metrics():
try:
total_in_queue = rdb.llen(QUEUE_NAME)
except redis.RedisError as exc:
return jsonify(error=f"Redis error: {exc}"), 500
with metrics_lock:
return jsonify(
total_jobs_in_queue=total_in_queue,
jobs_done=jobs_done,
jobs_failed=jobs_failed,
)
def start_workers() -> None:
for i in range(WORKER_COUNT):
t = threading.Thread(target=run_worker, args=(i,), daemon=True)
t.start()
if __name__ == "__main__":
start_workers()
log.info("Starting worker HTTP server on port %s with %s worker threads", PORT_WORKER, WORKER_COUNT)
app.run(host="0.0.0.0", port=PORT_WORKER, use_reloader=False)