Skip to content

Commit 3ba0c19

Browse files
committed
Ensure logs are returned to the platform, make loop.close() more robust
1 parent d1dd772 commit 3ba0c19

1 file changed

Lines changed: 31 additions & 6 deletions

File tree

compute_worker/compute_worker.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,10 @@ async def _run_container_engine_cmd(self, container, kind):
704704
# Creating this and setting 2 values to None in case there is not enough time for the worker to get logs, otherwise we will have errors later on
705705
logs_Unified = [None, None]
706706

707+
# To store on-going logs and avoid empty logs returning to the platform
708+
stdout_chunks = []
709+
stderr_chunks = []
710+
707711
# Create a websocket to send the logs in real time to the codabench instance
708712
# We need to set a timeout for the websocket connection otherwise the program will get stuck if he websocket does not connect.
709713
websocket = None
@@ -753,7 +757,9 @@ async def _run_container_engine_cmd(self, container, kind):
753757
"Show the logs and stream them to codabench " + container.get("Id")
754758
)
755759
for log in container_LogsDemux:
756-
if str(log[0]) != "None":
760+
# Output
761+
if log[0] is not None:
762+
stdout_chunks.append(log[0])
757763
logger.info(log[0].decode())
758764
try:
759765
if websocket is not None:
@@ -762,8 +768,10 @@ async def _run_container_engine_cmd(self, container, kind):
762768
)
763769
except Exception as e:
764770
logger.error(e)
765-
766-
elif str(log[1]) != "None":
771+
772+
# Errors
773+
elif log[1] is not None:
774+
stderr_chunks.append(log[1])
767775
logger.error(log[1].decode())
768776
try:
769777
if websocket is not None:
@@ -785,13 +793,17 @@ async def _run_container_engine_cmd(self, container, kind):
785793
# Get the return code of the competition container once done
786794
try:
787795
# Gets the logs of the container, sperating stdout and stderr (first and second position) thanks for demux=True
788-
logs_Unified = client.attach(container, logs=True, demux=True)
789796
return_Code = client.wait(container)
797+
logs_Unified = (b"".join(stdout_chunks), b"".join(stderr_chunks))
790798
logger.debug(
791799
f"WORKER_MARKER: Disconnecting from {websocket_url}, program counter = {self.completed_program_counter}"
792800
)
793801
if websocket is not None:
794-
await websocket.close()
802+
try:
803+
await websocket.close()
804+
await websocket.wait_closed()
805+
except Exception as e:
806+
logger.error(e)
795807
client.remove_container(container, force=True)
796808

797809
logger.debug(
@@ -1268,9 +1280,22 @@ def start(self):
12681280

12691281
finally:
12701282
signal.alarm(0)
1283+
self.watch = False
1284+
1285+
# Cancel any remaining pending tasks before closing the loop
1286+
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
1287+
for task in pending:
1288+
task.cancel()
1289+
if pending:
1290+
try:
1291+
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
1292+
except Exception:
1293+
pass
1294+
1295+
# Close loop
12711296
asyncio.set_event_loop(None)
12721297
loop.close()
1273-
self.watch = False
1298+
12741299
for kind, logs in self.logs.items():
12751300
if logs["end"] is not None:
12761301
elapsed_time = logs["end"] - logs["start"]

0 commit comments

Comments
 (0)