@@ -502,7 +502,7 @@ def _so3_grid(n: int) -> list[np.ndarray]:
502502 k = np .arange (n )
503503 alphas = 2.0 * np .pi * k / n
504504 betas = np .arccos (np .clip (1.0 - 2.0 * (k + 0.5 ) / n , - 1.0 , 1.0 ))
505- gammas = 2.0 * np .pi * k / n
505+ gammas = 2.0 * np .pi * ( k + 0.5 ) / n
506506
507507 ca , sa = np .cos (alphas ), np .sin (alphas )
508508 cb , sb = np .cos (betas ), np .sin (betas )
@@ -2301,14 +2301,10 @@ def run(self) -> None:
23012301 if self .max_iterations > 0 else self .n_parallel )
23022302 if remaining <= 0 :
23032303 break
2304- batch = self ._collect_task_batch ( min ( self . n_parallel , remaining ) )
2305- if not batch :
2304+ exhausted = self ._run_parallel_rolling ( history_log , priority_log )
2305+ if exhausted :
23062306 logger .info ("All candidate (EQ, pair) combinations exhausted. Stopping." )
2307- break
2308- self ._iteration += len (batch )
2309- self .graph .last_iteration = self ._iteration
2310- self ._run_batch_parallel (batch , history_log , priority_log )
2311- continue
2307+ break
23122308
23132309 # ── Sequential path (n_parallel == 1) ────────────────────
23142310 # Re-weight the queue correctly after a new lowest-energy
@@ -2415,6 +2411,248 @@ def _finalize_iteration(
24152411 self .graph .save (self .graph_json_path )
24162412 self ._write_priority_log (priority_log )
24172413
2414+
2415+ # ------------------------------------------------------------------ #
2416+ # Rolling-window parallel execution #
2417+ # ------------------------------------------------------------------ #
2418+
2419+ def _run_parallel_rolling (
2420+ self ,
2421+ history_log : str ,
2422+ priority_log : str ,
2423+ ) -> bool :
2424+ """Rolling-window parallel execution.
2425+
2426+ Maintains up to ``n_parallel`` workers simultaneously. The moment
2427+ any worker finishes, its result is processed *immediately* (so newly
2428+ discovered EQ nodes are added to the queue right away), and a fresh
2429+ task is submitted to keep the pool full.
2430+
2431+ This replaces the old ``_collect_task_batch`` + ``_run_batch_parallel``
2432+ pattern, which caused a "big bang / big wait" rhythm where no new
2433+ tasks could start until every worker in the current batch had finished.
2434+
2435+ Timeout semantics
2436+ -----------------
2437+ ``worker_timeout_s`` is applied **per worker** (via
2438+ ``wait(timeout=worker_timeout_s, return_when=FIRST_COMPLETED)``).
2439+ If a single ``wait`` call returns an empty ``done`` set, every
2440+ still-running worker is force-killed and the whole pool is drained as
2441+ TIMEOUT, then the loop exits. This is consistent with the sequential
2442+ path's per-worker timeout in ``_run_autots``.
2443+
2444+ Returns
2445+ -------
2446+ bool
2447+ ``True`` → queue exhausted (all pairs have been explored).
2448+ ``False`` → stopped because ``max_iterations`` was reached or
2449+ a ``stop.txt`` sentinel was detected.
2450+ """
2451+ from concurrent .futures import wait as _fut_wait , FIRST_COMPLETED
2452+
2453+ executor = ProcessPoolExecutor (
2454+ max_workers = self .n_parallel ,
2455+ mp_context = self ._mp_ctx ,
2456+ max_tasks_per_child = 1 ,
2457+ )
2458+ # future → (task, run_dir, iteration, gamma_sign, atom_i, atom_j)
2459+ futures_map : dict = {}
2460+ timed_out = False
2461+ exhausted = False
2462+ _MAX_SKIP_EXPLORED = 256 # guard against infinite skip loop
2463+
2464+ # ── Inner helpers ──────────────────────────────────────────────────
2465+
2466+ _stop_logged = False
2467+
2468+ def _should_stop () -> bool :
2469+ """True when the outer loop should not submit more tasks."""
2470+ nonlocal _stop_logged
2471+ if os .path .isfile (os .path .join (self .output_dir , "stop.txt" )):
2472+ if not _stop_logged :
2473+ logger .info (
2474+ "stop.txt detected — %d in-flight worker(s) will run to "
2475+ "completion; no further tasks will be submitted." ,
2476+ len (futures_map ),
2477+ )
2478+ _stop_logged = True
2479+ return True
2480+ if self .max_iterations > 0 and self ._iteration >= self .max_iterations :
2481+ return True
2482+ return False
2483+
2484+ def _try_submit () -> bool :
2485+ """Pop one task and submit it to the executor.
2486+
2487+ Returns True when a future was successfully submitted,
2488+ False when the queue is exhausted or the stop condition is met.
2489+ """
2490+ nonlocal exhausted
2491+
2492+ if _should_stop ():
2493+ return False
2494+
2495+ self .queue .refresh_priorities (self .graph .reference_energy ())
2496+
2497+ # Skip already-explored tasks (iterative, not recursive)
2498+ for _ in range (_MAX_SKIP_EXPLORED ):
2499+ task = self .queue .pop ()
2500+
2501+ if task is None :
2502+ # Queue empty — try to refill once
2503+ size_before = len (self .queue )
2504+ for node in self .graph .all_nodes ():
2505+ self ._enqueue_perturbations (node , force_add = True )
2506+ if len (self .queue ) == size_before :
2507+ exhausted = True
2508+ return False # truly nothing left
2509+ self .queue .refresh_priorities (self .graph .reference_energy ())
2510+ task = self .queue .pop ()
2511+ if task is None :
2512+ exhausted = True
2513+ return False
2514+
2515+ gamma_sign , atom_i , atom_j = self ._parse_afir_task_key (task )
2516+ if self .explored_log .has (task .node_id , atom_i , atom_j , gamma_sign ):
2517+ logger .debug (
2518+ "_run_parallel_rolling: skipping already-explored task "
2519+ "(EQ%06d, %d-%d, %s)." ,
2520+ task .node_id , atom_i , atom_j , gamma_sign ,
2521+ )
2522+ self .queue .release ((task .node_id , tuple (task .afir_params )))
2523+ continue # try next task
2524+
2525+ # Valid task — submit it
2526+ self ._iteration += 1
2527+ self .graph .last_iteration = self ._iteration
2528+
2529+ run_dir = self ._make_run_dir (task , iteration = self ._iteration )
2530+ workspace = os .path .join (run_dir , "autots_workspace" )
2531+ config = self ._make_autots_config (task , workspace )
2532+ try :
2533+ with open (
2534+ os .path .join (run_dir , "config_used.json" ), "w" , encoding = "utf-8"
2535+ ) as fh :
2536+ json .dump (config , fh , indent = 2 , default = str )
2537+ except Exception as exc :
2538+ logger .warning ("Could not write config_used.json: %s" , exc )
2539+
2540+ future = executor .submit (_autots_worker , config , run_dir , workspace )
2541+ futures_map [future ] = (
2542+ task , run_dir , self ._iteration , gamma_sign , atom_i , atom_j
2543+ )
2544+ return True
2545+
2546+ # Exhausted skip budget — treat as queue empty
2547+ logger .warning (
2548+ "_run_parallel_rolling: exceeded %d consecutive already-explored "
2549+ "skips — treating queue as exhausted." ,
2550+ _MAX_SKIP_EXPLORED ,
2551+ )
2552+ exhausted = True
2553+ return False
2554+
2555+ def _handle_done (future ) -> None :
2556+ """Process a completed future and update the graph."""
2557+ task , run_dir , iteration , gamma_sign , atom_i , atom_j = (
2558+ futures_map .pop (future )
2559+ )
2560+ try :
2561+ profile_dirs = future .result ()
2562+ self ._process_single_result (
2563+ task , run_dir , profile_dirs , "DONE" , iteration ,
2564+ history_log , gamma_sign , atom_i , atom_j ,
2565+ )
2566+ except Exception as exc :
2567+ logger .error (
2568+ "_run_parallel_rolling: worker failed for %s: %s" , run_dir , exc
2569+ )
2570+ self ._process_single_result (
2571+ task , run_dir , [], "FAILED" , iteration ,
2572+ history_log , gamma_sign , atom_i , atom_j ,
2573+ )
2574+
2575+ # ── Execution ──────────────────────────────────────────────────────
2576+ try :
2577+ # Initial fill: submit up to n_parallel tasks
2578+ for _ in range (self .n_parallel ):
2579+ if not _try_submit ():
2580+ break
2581+
2582+ # Rolling loop
2583+ while futures_map :
2584+ # ── stop.txt check (between completions) ─────────────────
2585+ # New task submission is suppressed (_should_stop() returns
2586+ # True), but already-running workers are allowed to finish
2587+ # and their results are processed normally so no work is lost.
2588+ # The loop exits naturally once futures_map is drained.
2589+
2590+ # ── Wait for the next completed future ────────────────────
2591+ if self .worker_timeout_s is not None :
2592+ done , _ = _fut_wait (
2593+ list (futures_map ),
2594+ timeout = self .worker_timeout_s ,
2595+ return_when = FIRST_COMPLETED ,
2596+ )
2597+ if not done :
2598+ # Per-worker timeout: at least one worker is stuck
2599+ timed_out = True
2600+ logger .error (
2601+ "_run_parallel_rolling: per-worker timeout (%ds) exceeded "
2602+ "— force-killing all %d remaining workers." ,
2603+ self .worker_timeout_s , len (futures_map ),
2604+ )
2605+ worker_procs = getattr (executor , "_processes" , {})
2606+ for pid , proc in list (worker_procs .items ()):
2607+ if proc .is_alive ():
2608+ logger .warning (
2609+ "_run_parallel_rolling: force-killing pid=%d" , pid
2610+ )
2611+ proc .kill ()
2612+ # Drain all remaining futures as TIMEOUT
2613+ for future in list (futures_map ):
2614+ task , run_dir , iteration , gamma_sign , atom_i , atom_j = (
2615+ futures_map .pop (future )
2616+ )
2617+ logger .error (
2618+ "_run_parallel_rolling: TIMEOUT for %s" , run_dir
2619+ )
2620+ try :
2621+ self ._process_single_result (
2622+ task , run_dir , [], "TIMEOUT" , iteration ,
2623+ history_log , gamma_sign , atom_i , atom_j ,
2624+ )
2625+ except Exception as exc :
2626+ logger .error (
2627+ "_process_single_result after TIMEOUT (%s): %s" ,
2628+ run_dir , exc ,
2629+ )
2630+ self .queue .release (
2631+ (task .node_id , tuple (task .afir_params ))
2632+ )
2633+ break
2634+ else :
2635+ done , _ = _fut_wait (
2636+ list (futures_map ), return_when = FIRST_COMPLETED
2637+ )
2638+
2639+ # ── Process each completed future, then refill ─────────────
2640+ for future in done :
2641+ _handle_done (future )
2642+ # Immediately submit a replacement task if one is available
2643+ if not exhausted and not _should_stop ():
2644+ _try_submit ()
2645+
2646+ finally :
2647+ executor .shutdown (wait = not timed_out , cancel_futures = timed_out )
2648+ try :
2649+ self .graph .save (self .graph_json_path )
2650+ self ._write_priority_log (priority_log )
2651+ except Exception as exc :
2652+ logger .error ("_run_parallel_rolling: cleanup save/log failed: %s" , exc )
2653+
2654+ return exhausted
2655+
24182656 # ------------------------------------------------------------------ #
24192657 # Batch collection and parallel execution #
24202658 # ------------------------------------------------------------------ #
0 commit comments