Skip to content

Commit a1fb029

Browse files
authored
Add files via upload
1 parent a40e94e commit a1fb029

1 file changed

Lines changed: 65 additions & 25 deletions

File tree

multioptpy/Wrapper/mapper.py

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from concurrent.futures import (
2828
ProcessPoolExecutor,
2929
as_completed,
30+
BrokenProcessPool,
3031
TimeoutError as FuturesTimeoutError,
3132
)
3233
from dataclasses import dataclass, field
@@ -595,7 +596,7 @@ def _kabsch_rmsd(pa: np.ndarray, pb: np.ndarray) -> float:
595596
U, S, Vt = np.linalg.svd(pb.T @ pa)
596597
d = np.linalg.det(U) * np.linalg.det(Vt)
597598

598-
if d < 0:
599+
if d < -0.1:
599600
return float("inf")
600601

601602
E0 = np.sum(pa ** 2) + np.sum(pb ** 2)
@@ -2310,22 +2311,25 @@ def _run_sequential(self, history_log: str, priority_log: str) -> None:
23102311
Executor lifetime
23112312
-----------------
23122313
A single ProcessPoolExecutor(max_workers=1, max_tasks_per_child=1) is
2313-
created here before the loop and torn down in the finally clause.
2314-
Previously _run_autots created and destroyed an executor on every
2315-
iteration, incurring one full spawn/join cycle per AutoTS call.
2316-
Hoisting the executor out of the loop removes that overhead while
2317-
preserving the CWD isolation guarantee: max_tasks_per_child=1 ensures
2318-
that each task still runs in a freshly spawned child process, so
2319-
os.chdir() inside _autots_worker cannot bleed across iterations.
2314+
created before the loop and torn down in the finally clause.
2315+
If the worker process is force-killed by a timeout, the executor enters
2316+
a broken state. The broken executor is detected via BrokenProcessPool,
2317+
shut down, and replaced with a fresh one so that subsequent iterations
2318+
can continue normally. max_tasks_per_child=1 ensures that each task
2319+
still runs in a freshly spawned child process, so os.chdir() inside
2320+
_autots_worker cannot bleed across iterations.
23202321
"""
2321-
# ADDED: create the executor once for the entire sequential run.
2322-
# max_tasks_per_child=1 keeps per-task process isolation (os.chdir safety).
2323-
executor = ProcessPoolExecutor(
2324-
max_workers=1,
2325-
mp_context=self._mp_ctx,
2326-
max_tasks_per_child=1,
2327-
)
2328-
try: # ADDED: wrapping try/finally to guarantee executor.shutdown()
2322+
from concurrent.futures import BrokenProcessPool
2323+
2324+
def _make_executor() -> ProcessPoolExecutor:
2325+
return ProcessPoolExecutor(
2326+
max_workers=1,
2327+
mp_context=self._mp_ctx,
2328+
max_tasks_per_child=1,
2329+
)
2330+
2331+
executor = _make_executor()
2332+
try:
23292333
while True:
23302334
# ── stop.txt sentinel file ────────────────────────────────────
23312335
if os.path.isfile(os.path.join(self.output_dir, "stop.txt")):
@@ -2369,18 +2373,33 @@ def _run_sequential(self, history_log: str, priority_log: str) -> None:
23692373
# ── Task execution ────────────────────────────────────────────
23702374
self._iteration += 1
23712375
self.graph.last_iteration = self._iteration
2372-
self._append_history(history_log, self._iteration, task)
23732376

23742377
run_dir = self._make_run_dir(task)
23752378
try:
2376-
profile_dirs = self._run_autots(task, run_dir, executor) # CHANGED: pass executor
2379+
profile_dirs = self._run_autots(task, run_dir, executor)
2380+
except BrokenProcessPool:
2381+
# The executor was broken by a prior force-kill (worker timeout).
2382+
# Shut it down and spin up a fresh one so subsequent iterations
2383+
# are not permanently blocked.
2384+
logger.warning(
2385+
"_run_sequential: executor broken after force-kill — "
2386+
"recreating ProcessPoolExecutor and marking iteration %06d as FAILED.",
2387+
self._iteration,
2388+
)
2389+
executor.shutdown(wait=False)
2390+
executor = _make_executor()
2391+
self.queue.release((task.node_id, tuple(task.afir_params)))
2392+
self._append_history(history_log, self._iteration, task, "FAILED")
2393+
self._finalize_iteration(run_dir, task, "FAILED", [], priority_log)
2394+
continue
23772395
except Exception as exc:
23782396
logger.error("AutoTS failed for run %s: %s", run_dir, exc)
23792397
# Do not call explored_log.record() on failure.
23802398
# _in_flight (set by pop()) prevents duplicates within the
23812399
# current run. Omitting record() allows transient failures
23822400
# (OOM, segfault, etc.) to be retried on resume.
23832401
self.queue.release((task.node_id, tuple(task.afir_params)))
2402+
self._append_history(history_log, self._iteration, task, "FAILED")
23842403
self._finalize_iteration(run_dir, task, "FAILED", [], priority_log)
23852404
continue
23862405

@@ -2402,14 +2421,10 @@ def _run_sequential(self, history_log: str, priority_log: str) -> None:
24022421
if hasattr(self.queue, "set_graph"):
24032422
self.queue.set_graph(self.graph)
24042423

2424+
self._append_history(history_log, self._iteration, task, "DONE")
24052425
self._finalize_iteration(run_dir, task, "DONE", profile_dirs, priority_log)
24062426

24072427
finally:
2408-
# ADDED: shut down the shared executor once the loop exits for any reason
2409-
# (exhausted, max_iterations, stop.txt, or unhandled exception).
2410-
# wait=True performs a clean join on any still-running worker, which is
2411-
# always safe here because the loop only reaches finally after the
2412-
# current task has either completed or been force-killed in _run_autots.
24132428
executor.shutdown(wait=True)
24142429

24152430
def _append_history(
@@ -2570,7 +2585,21 @@ def _try_submit() -> bool:
25702585
except Exception as exc:
25712586
logger.warning("Could not write config_used.json: %s", exc)
25722587

2573-
future = executor.submit(_autots_worker, config, run_dir, workspace)
2588+
try:
2589+
future = executor.submit(_autots_worker, config, run_dir, workspace)
2590+
except BrokenProcessPool as exc:
2591+
# The pool died between the last result and this submit.
2592+
# Release the task so it can be retried on resume, then
2593+
# stop submitting — remaining in-flight futures will drain
2594+
# naturally through _handle_done's except-Exception branch.
2595+
logger.error(
2596+
"_try_submit: process pool is broken (%s) — "
2597+
"releasing task (EQ%06d %s) and halting submission.",
2598+
exc, task.node_id, task.afir_params,
2599+
)
2600+
self.queue.release((task.node_id, tuple(task.afir_params)))
2601+
exhausted = True
2602+
return False
25742603
futures_map[future] = (
25752604
task, run_dir, self._iteration, gamma_sign, atom_i, atom_j
25762605
)
@@ -2674,7 +2703,18 @@ def _handle_done(future) -> None:
26742703
_handle_done(future)
26752704
# Immediately submit a replacement task if one is available
26762705
if not exhausted and not _should_stop():
2677-
_try_submit()
2706+
try:
2707+
_try_submit()
2708+
except BrokenProcessPool as exc:
2709+
# Defensive catch: _try_submit normally handles this
2710+
# internally, but guard here too so the drain loop
2711+
# can finish processing remaining futures safely.
2712+
logger.error(
2713+
"_run_parallel_rolling: BrokenProcessPool "
2714+
"escaped _try_submit (%s) — halting submission.",
2715+
exc,
2716+
)
2717+
exhausted = True
26782718

26792719
finally:
26802720
executor.shutdown(wait=not timed_out, cancel_futures=timed_out)

0 commit comments

Comments
 (0)