Skip to content

Commit 5795663

Browse files
authored
Add files via upload
1 parent a67bc8a commit 5795663

1 file changed

Lines changed: 134 additions & 50 deletions

File tree

multioptpy/Wrapper/mapper.py

Lines changed: 134 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,15 @@
2727
from concurrent.futures import (
2828
ProcessPoolExecutor,
2929
as_completed,
30-
BrokenProcessPool,
3130
TimeoutError as FuturesTimeoutError,
3231
)
32+
try:
33+
from concurrent.futures import BrokenProcessPool
34+
except ImportError:
35+
try:
36+
from concurrent.futures.process import BrokenProcessPool # type: ignore[no-redef]
37+
except ImportError:
38+
BrokenProcessPool = RuntimeError # type: ignore[misc,assignment]
3339
from dataclasses import dataclass, field
3440
from typing import Any
3541

@@ -2469,16 +2475,18 @@ def _run_parallel_rolling(
24692475
history_log: str,
24702476
priority_log: str,
24712477
) -> bool:
2472-
"""Rolling-window parallel execution.
2478+
"""Rolling-window parallel execution with automatic pool reconstruction.
24732479
24742480
Maintains up to ``n_parallel`` workers simultaneously. The moment
24752481
any worker finishes, its result is processed *immediately* (so newly
24762482
discovered EQ nodes are added to the queue right away), and a fresh
24772483
task is submitted to keep the pool full.
24782484
2479-
This replaces the old ``_collect_task_batch`` + ``_run_batch_parallel``
2480-
pattern, which caused a "big bang / big wait" rhythm where no new
2481-
tasks could start until every worker in the current batch had finished.
2485+
If the process pool is broken by an external signal (OOM killer, etc.),
2486+
the pool is transparently rebuilt and exploration continues up to
2487+
``max_iterations``. At most ``_MAX_POOL_REBUILDS`` consecutive
2488+
reconstructions are attempted; if this limit is reached the method
2489+
exits gracefully with the results collected so far.
24822490
24832491
Timeout semantics
24842492
-----------------
@@ -2497,22 +2505,25 @@ def _run_parallel_rolling(
24972505
a ``stop.txt`` sentinel was detected.
24982506
"""
24992507
from concurrent.futures import wait as _fut_wait, FIRST_COMPLETED
2500-
try:
2501-
from concurrent.futures.process import BrokenProcessPool as _BrokenProcessPool
2502-
except ImportError:
2503-
# Fallback: catch all submit-time errors as pool-broken
2504-
_BrokenProcessPool = RuntimeError # type: ignore[misc,assignment]
2505-
2506-
executor = ProcessPoolExecutor(
2507-
max_workers=self.n_parallel,
2508-
mp_context=self._mp_ctx,
2509-
max_tasks_per_child=1,
2510-
)
2508+
2509+
_MAX_SKIP_EXPLORED = 256 # guard against infinite skip loop
2510+
_MAX_POOL_REBUILDS = 10 # safety ceiling on pool reconstruction attempts
2511+
2512+
# ── Pool factory ───────────────────────────────────────────────────
2513+
def _new_executor() -> ProcessPoolExecutor:
2514+
return ProcessPoolExecutor(
2515+
max_workers=self.n_parallel,
2516+
mp_context=self._mp_ctx,
2517+
max_tasks_per_child=1,
2518+
)
2519+
2520+
executor: ProcessPoolExecutor = _new_executor()
25112521
# future → (task, run_dir, iteration, gamma_sign, atom_i, atom_j)
2512-
futures_map: dict = {}
2522+
futures_map: dict = {}
25132523
timed_out = False
25142524
exhausted = False
2515-
_MAX_SKIP_EXPLORED = 256 # guard against infinite skip loop
2525+
pool_broken = False # set when BrokenProcessPool is detected
2526+
pool_rebuild_count = 0
25162527

25172528
# ── Inner helpers ──────────────────────────────────────────────────
25182529

@@ -2537,10 +2548,13 @@ def _should_stop() -> bool:
25372548
def _try_submit() -> bool:
25382549
"""Pop one task and submit it to the executor.
25392550
2540-
Returns True when a future was successfully submitted,
2541-
False when the queue is exhausted or the stop condition is met.
2551+
Returns True when a future was successfully submitted.
2552+
Returns False and sets ``pool_broken=True`` if the process pool
2553+
has died; the caller is responsible for triggering a rebuild.
2554+
Also returns False if the queue is exhausted or a stop condition
2555+
is met.
25422556
"""
2543-
nonlocal exhausted
2557+
nonlocal exhausted, pool_broken
25442558

25452559
if _should_stop():
25462560
return False
@@ -2592,18 +2606,14 @@ def _try_submit() -> bool:
25922606

25932607
try:
25942608
future = executor.submit(_autots_worker, config, run_dir, workspace)
2595-
except _BrokenProcessPool as exc:
2596-
# The pool died between the last result and this submit.
2597-
# Release the task so it can be retried on resume, then
2598-
# stop submitting — remaining in-flight futures will drain
2599-
# naturally through _handle_done's except-Exception branch.
2609+
except BrokenProcessPool as exc:
26002610
logger.error(
26012611
"_try_submit: process pool is broken (%s) — "
2602-
"releasing task (EQ%06d %s) and halting submission.",
2612+
"releasing task (EQ%06d %s); pool will be rebuilt.",
26032613
exc, task.node_id, task.afir_params,
26042614
)
26052615
self.queue.release((task.node_id, tuple(task.afir_params)))
2606-
exhausted = True
2616+
pool_broken = True
26072617
return False
26082618
futures_map[future] = (
26092619
task, run_dir, self._iteration, gamma_sign, atom_i, atom_j
@@ -2620,7 +2630,12 @@ def _try_submit() -> bool:
26202630
return False
26212631

26222632
def _handle_done(future) -> None:
2623-
"""Process a completed future and update the graph."""
2633+
"""Process a completed future and update the graph.
2634+
2635+
Distinguishes BrokenProcessPool (sets pool_broken flag so the
2636+
main loop can trigger a rebuild) from ordinary worker failures.
2637+
"""
2638+
nonlocal pool_broken
26242639
task, run_dir, iteration, gamma_sign, atom_i, atom_j = (
26252640
futures_map.pop(future)
26262641
)
@@ -2630,6 +2645,17 @@ def _handle_done(future) -> None:
26302645
task, run_dir, profile_dirs, "DONE", iteration,
26312646
history_log, gamma_sign, atom_i, atom_j,
26322647
)
2648+
except BrokenProcessPool as exc:
2649+
logger.error(
2650+
"_run_parallel_rolling: BrokenProcessPool for %s: %s "
2651+
"— marking FAILED, pool will be rebuilt.",
2652+
run_dir, exc,
2653+
)
2654+
pool_broken = True
2655+
self._process_single_result(
2656+
task, run_dir, [], "FAILED", iteration,
2657+
history_log, gamma_sign, atom_i, atom_j,
2658+
)
26332659
except Exception as exc:
26342660
logger.error(
26352661
"_run_parallel_rolling: worker failed for %s: %s", run_dir, exc
@@ -2639,20 +2665,89 @@ def _handle_done(future) -> None:
26392665
history_log, gamma_sign, atom_i, atom_j,
26402666
)
26412667

2668+
def _drain_broken_futures() -> None:
2669+
"""Mark all in-flight futures as FAILED without calling future.result().
2670+
2671+
Called when the pool is already known to be broken, so we skip
2672+
the (would-raise) result() call and process everything as FAILED
2673+
immediately, freeing futures_map for the rebuilt pool.
2674+
"""
2675+
for f in list(futures_map):
2676+
task, run_dir, iteration, gamma_sign, atom_i, atom_j = (
2677+
futures_map.pop(f)
2678+
)
2679+
logger.error(
2680+
"_drain_broken_futures: marking in-flight run as FAILED: %s",
2681+
run_dir,
2682+
)
2683+
try:
2684+
self._process_single_result(
2685+
task, run_dir, [], "FAILED", iteration,
2686+
history_log, gamma_sign, atom_i, atom_j,
2687+
)
2688+
except Exception as exc:
2689+
logger.error(
2690+
"_drain_broken_futures: _process_single_result error "
2691+
"for %s: %s — releasing queue key manually.",
2692+
run_dir, exc,
2693+
)
2694+
self.queue.release(
2695+
(task.node_id, tuple(task.afir_params))
2696+
)
2697+
2698+
def _rebuild_pool() -> None:
2699+
"""Shut down the broken executor and start a fresh one."""
2700+
nonlocal executor, pool_broken, pool_rebuild_count
2701+
try:
2702+
executor.shutdown(wait=False, cancel_futures=True)
2703+
except Exception as exc:
2704+
logger.debug(
2705+
"_rebuild_pool: executor.shutdown raised %s (ignored).", exc
2706+
)
2707+
pool_rebuild_count += 1
2708+
executor = _new_executor()
2709+
pool_broken = False
2710+
logger.warning(
2711+
"_run_parallel_rolling: process pool rebuilt "
2712+
"(rebuild #%d / max %d).",
2713+
pool_rebuild_count, _MAX_POOL_REBUILDS,
2714+
)
2715+
26422716
# ── Execution ──────────────────────────────────────────────────────
26432717
try:
26442718
# Initial fill: submit up to n_parallel tasks
26452719
for _ in range(self.n_parallel):
26462720
if not _try_submit():
26472721
break
26482722

2649-
# Rolling loop
2650-
while futures_map:
2651-
# ── stop.txt check (between completions) ─────────────────
2652-
# New task submission is suppressed (_should_stop() returns
2653-
# True), but already-running workers are allowed to finish
2654-
# and their results are processed normally so no work is lost.
2655-
# The loop exits naturally once futures_map is drained.
2723+
# Rolling loop — continues while work is in flight OR a pool
2724+
# rebuild is pending (futures_map will be empty right after a
2725+
# drain but we still need to refill the fresh pool).
2726+
while futures_map or (
2727+
pool_broken
2728+
and not _should_stop()
2729+
and not exhausted
2730+
and pool_rebuild_count < _MAX_POOL_REBUILDS
2731+
):
2732+
2733+
# ── Pool-rebuild branch ────────────────────────────────────
2734+
if pool_broken:
2735+
if pool_rebuild_count >= _MAX_POOL_REBUILDS:
2736+
logger.error(
2737+
"_run_parallel_rolling: maximum pool rebuilds (%d) "
2738+
"reached — stopping exploration.",
2739+
_MAX_POOL_REBUILDS,
2740+
)
2741+
break
2742+
_drain_broken_futures() # empties futures_map
2743+
_rebuild_pool() # replaces executor, clears pool_broken
2744+
# Refill the fresh pool up to n_parallel slots
2745+
for _ in range(self.n_parallel):
2746+
if _should_stop() or exhausted or pool_broken:
2747+
break
2748+
if not _try_submit():
2749+
break
2750+
continue
26562751

26572752
# ── Wait for the next completed future ────────────────────
26582753
if self.worker_timeout_s is not None:
@@ -2706,20 +2801,9 @@ def _handle_done(future) -> None:
27062801
# ── Process each completed future, then refill ─────────────
27072802
for future in done:
27082803
_handle_done(future)
2709-
# Immediately submit a replacement task if one is available
2710-
if not exhausted and not _should_stop():
2711-
try:
2712-
_try_submit()
2713-
except _BrokenProcessPool as exc:
2714-
# Defensive catch: _try_submit normally handles this
2715-
# internally, but guard here too so the drain loop
2716-
# can finish processing remaining futures safely.
2717-
logger.error(
2718-
"_run_parallel_rolling: BrokenProcessPool "
2719-
"escaped _try_submit (%s) — halting submission.",
2720-
exc,
2721-
)
2722-
exhausted = True
2804+
# Submit a replacement only when the pool is healthy
2805+
if not exhausted and not pool_broken and not _should_stop():
2806+
_try_submit()
27232807

27242808
finally:
27252809
executor.shutdown(wait=not timed_out, cancel_futures=timed_out)

0 commit comments

Comments
 (0)