Skip to content

Commit 299f052

Browse files
committed
Improve update status and logs in compute worker
1 parent 61d352a commit 299f052

1 file changed

Lines changed: 87 additions & 53 deletions

File tree

compute_worker/compute_worker.py

Lines changed: 87 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import hashlib
44
import json
55
import os
6+
import traceback
67
import shutil
78
import signal
89
import socket
@@ -18,7 +19,6 @@
1819
from rich.progress import Progress
1920
from rich.pretty import pprint
2021
import requests
21-
2222
import websockets
2323
import yaml
2424
from billiard.exceptions import SoftTimeLimitExceeded
@@ -89,22 +89,31 @@
8989

9090
def show_progress(line, progress):
9191
try:
92-
if "Status: Image is up to date" in line["status"]:
93-
logger.info(line["status"])
92+
status = line.get("status") or ""
93+
layer_id = line.get("id")
94+
detail = line.get("progressDetail") or {}
95+
current = detail.get("current")
96+
total = detail.get("total")
97+
98+
if "Status: Image is up to date" in status:
99+
logger.info(status)
100+
101+
if not layer_id:
102+
return
94103

95104
completed = False
96-
if line["status"] == "Download complete":
105+
if status == "Download complete":
97106
description = (
98-
f"[blue][Download complete, waiting for extraction {line['id']}]"
107+
f"[blue][Download complete, waiting for extraction {layer_id}]"
99108
)
100109
completed = True
101-
elif line["status"] == "Downloading":
102-
description = f"[bold][Downloading {line['id']}]"
103-
elif line["status"] == "Pull complete":
104-
description = f"[green][Extraction complete {line['id']}]"
110+
elif status == "Downloading":
111+
description = f"[bold][Downloading {layer_id}]"
112+
elif status == "Pull complete":
113+
description = f"[green][Extraction complete {layer_id}]"
105114
completed = True
106-
elif line["status"] == "Extracting":
107-
description = f"[blue][Extracting {line['id']}]"
115+
elif status == "Extracting":
116+
description = f"[blue][Extracting {layer_id}]"
108117

109118
else:
110119
# skip other statuses, but show extraction progress
@@ -121,7 +130,7 @@ def show_progress(line, progress):
121130
)
122131
else:
123132
tasks[task_id] = progress.add_task(
124-
description, total=line["progressDetail"]["total"]
133+
description, total=total
125134
)
126135
else:
127136
if completed:
@@ -134,12 +143,11 @@ def show_progress(line, progress):
134143
else:
135144
progress.update(
136145
tasks[task_id],
137-
completed=line["progressDetail"]["current"],
138-
total=line["progressDetail"]["total"],
146+
completed=current,
147+
total=total,
139148
)
140149
except Exception as e:
141-
logger.error("There was an error showing the progress bar")
142-
logger.error(e)
150+
logger.exception("There was an error showing the progress bar")
143151

144152

145153
# -----------------------------------------------
@@ -244,20 +252,25 @@ def run_wrapper(run_args):
244252
run_args.update(secret=str(run_args["secret"]))
245253
logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}")
246254
run = Run(run_args)
247-
248255
try:
249256
run.prepare()
250257
run.start()
251258
if run.is_scoring:
252259
run.push_scores()
253260
run.push_output()
254-
except DockerImagePullException as e:
255-
run._update_status(STATUS_FAILED, str(e))
256-
except SubmissionException as e:
257-
run._update_status(STATUS_FAILED, str(e))
258-
except SoftTimeLimitExceeded:
259-
run._update_status(STATUS_FAILED, "Soft time limit exceeded!")
261+
except (DockerImagePullException, SubmissionException, SoftTimeLimitExceeded) as e:
262+
run._update_status(STATUS_FAILED, traceback.format_exc())
263+
raise
264+
except Exception as e:
265+
# Catch any exception to avoid getting stuck in Running status
266+
run._update_status(STATUS_FAILED, traceback.format_exc())
267+
raise
260268
finally:
269+
try:
270+
# Try to push logs before cleanup
271+
run.push_logs()
272+
except Exception:
273+
logger.exception("push_logs failed")
261274
run.clean_up()
262275

263276

@@ -446,6 +459,22 @@ async def watch_detailed_results(self):
446459
if file_path:
447460
await self.send_detailed_results(file_path)
448461

462+
def push_logs(self):
463+
"""Upload any collected logs, even in case of crash.
464+
"""
465+
try:
466+
for kind, logs in (self.logs or {}).items():
467+
for stream_key in ("stdout", "stderr"):
468+
entry = logs.get(stream_key) if isinstance(logs, dict) else None
469+
if not entry:
470+
continue
471+
location = entry.get("location")
472+
data = entry.get("data") or b""
473+
if location:
474+
self._put_file(location, raw_data=data)
475+
except Exception as e:
476+
logger.exception(f"Failed best-effort log upload: {e}")
477+
449478
def get_detailed_results_file_path(self):
450479
default_detailed_results_path = os.path.join(
451480
self.output_dir, "detailed_results.html"
@@ -467,7 +496,7 @@ async def send_detailed_results(self, file_path):
467496
)
468497
websocket_url = f"{self.websocket_url}?kind=detailed_results"
469498
logger.info(f"Connecting to {websocket_url} for detailed results")
470-
# Wrap this with a Try ... Except otherwise a failure here will make the submission get stuck on Running
499+
# Wrap this with a Try block to avoid getting stuck on Running
471500
try:
472501
websocket = await asyncio.wait_for(
473502
websockets.connect(websocket_url), timeout=30.0
@@ -485,9 +514,7 @@ async def send_detailed_results(self, file_path):
485514
)
486515
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
487516
logger.exception(e)
488-
raise SubmissionException(
489-
"Could not connect to instance to update detailed result"
490-
)
517+
return
491518

492519
def _get_stdout_stderr_file_names(self, run_args):
493520
# run_args should be the run_args argument passed to __init__ from the run_wrapper.
@@ -513,7 +540,7 @@ def _update_submission(self, data):
513540

514541
logger.info(f"Updating submission @ {url} with data = {data}")
515542

516-
resp = self.requests_session.patch(url, data, timeout=150)
543+
resp = self.requests_session.patch(url, data=data, timeout=150)
517544
if resp.status_code == 200:
518545
logger.info("Submission updated successfully!")
519546
else:
@@ -523,23 +550,17 @@ def _update_submission(self, data):
523550
raise SubmissionException("Failure updating submission data.")
524551

525552
def _update_status(self, status, extra_information=None):
553+
# Update submission status
526554
if status not in AVAILABLE_STATUSES:
527555
raise SubmissionException(
528556
f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}"
529557
)
530-
531-
data = {
532-
"status": status,
533-
"status_details": extra_information,
534-
}
535-
536-
# TODO: figure out if we should pull this task code later(submission.task should always be set)
537-
# When we start
538-
# if status == STATUS_SCORING:
539-
# data.update({
540-
# "task_pk": self.task_pk,
541-
# })
542-
self._update_submission(data)
558+
data = {"status": status, "status_details": extra_information}
559+
try:
560+
self._update_submission(data)
561+
except Exception as e:
562+
# Always catch exception and never raise error
563+
logger.exception(f"Failed to update submission status to {status}: {e}")
543564

544565
def _get_container_image(self, image_name):
545566
logger.info("Running pull for image: {}".format(image_name))
@@ -549,6 +570,8 @@ def _get_container_image(self, image_name):
549570
with Progress() as progress:
550571
resp = client.pull(image_name, stream=True, decode=True)
551572
for line in resp:
573+
if isinstance(line, dict) and line.get("error"):
574+
raise DockerImagePullException(line["error"])
552575
show_progress(line, progress)
553576
break # Break if the loop is successful to exit "with Progress() as progress"
554577

@@ -686,6 +709,7 @@ async def _run_container_engine_cmd(self, container, kind):
686709

687710
# Create a websocket to send the logs in real time to the codabench instance
688711
# We need to set a timeout for the websocket connection otherwise the program will get stuck if he websocket does not connect.
712+
websocket = None
689713
try:
690714
websocket_url = f"{self.websocket_url}?kind={kind}"
691715
logger.debug(
@@ -735,18 +759,20 @@ async def _run_container_engine_cmd(self, container, kind):
735759
if str(log[0]) != "None":
736760
logger.info(log[0].decode())
737761
try:
738-
await websocket.send(
739-
json.dumps({"kind": kind, "message": log[0].decode()})
740-
)
762+
if websocket is not None:
763+
await websocket.send(
764+
json.dumps({"kind": kind, "message": log[0].decode()})
765+
)
741766
except Exception as e:
742767
logger.error(e)
743768

744769
elif str(log[1]) != "None":
745770
logger.error(log[1].decode())
746771
try:
747-
await websocket.send(
748-
json.dumps({"kind": kind, "message": log[1].decode()})
749-
)
772+
if websocket is not None:
773+
await websocket.send(
774+
json.dumps({"kind": kind, "message": log[1].decode()})
775+
)
750776
except Exception as e:
751777
logger.error(e)
752778

@@ -767,7 +793,8 @@ async def _run_container_engine_cmd(self, container, kind):
767793
logger.debug(
768794
f"WORKER_MARKER: Disconnecting from {websocket_url}, program counter = {self.completed_program_counter}"
769795
)
770-
await websocket.close()
796+
if websocket is not None:
797+
await websocket.close()
771798
client.remove_container(container, force=True)
772799

773800
logger.debug(
@@ -785,6 +812,13 @@ async def _run_container_engine_cmd(self, container, kind):
785812
logger.error(e)
786813
return_Code = {"StatusCode": 1}
787814

815+
finally:
816+
try:
817+
# Last chance of removing container
818+
client.remove_container(container_id, force=True)
819+
except Exception:
820+
pass
821+
788822
self.logs[kind] = {
789823
"returncode": return_Code["StatusCode"],
790824
"start": start,
@@ -1055,9 +1089,8 @@ async def _run_program_directory(self, program_dir, kind):
10551089
try:
10561090
return await self._run_container_engine_cmd(container, kind=kind)
10571091
except Exception as e:
1058-
logger.error(e)
1059-
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
1060-
logger.exception(e)
1092+
logger.exception("Program directory execution failed")
1093+
raise SubmissionException(str(e))
10611094

10621095
def _put_dir(self, url, directory):
10631096
"""Zip the directory and send it to the given URL using _put_file."""
@@ -1099,7 +1132,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type="application/zip
10991132
logger.info("Putting file %s in %s" % (file, url))
11001133
data = open(file, "rb")
11011134
headers["Content-Length"] = str(os.path.getsize(file))
1102-
elif raw_data:
1135+
elif raw_data is not None:
11031136
logger.info("Putting raw data %s in %s" % (raw_data, url))
11041137
data = raw_data
11051138
else:
@@ -1234,6 +1267,7 @@ def start(self):
12341267
asyncio.run(self._send_data_through_socket(error_message))
12351268
raise SubmissionException(error_message)
12361269
finally:
1270+
loop.close()
12371271
self.watch = False
12381272
for kind, logs in self.logs.items():
12391273
if logs["end"] is not None:
@@ -1292,7 +1326,7 @@ def start(self):
12921326
program_results, BaseException
12931327
) and not isinstance(program_results, asyncio.CancelledError)
12941328
program_rc = getattr(self, "program_exit_code", None)
1295-
failed_rc = program_rc not in (0, None)
1329+
failed_rc = (program_rc is None) or (program_rc != 0)
12961330
if had_async_exc or failed_rc:
12971331
self._update_status(
12981332
STATUS_FAILED,

0 commit comments

Comments
 (0)