Skip to content

Commit a8a8f14

Browse files
authored
Add files via upload
1 parent 12cc15b commit a8a8f14

1 file changed

Lines changed: 83 additions & 109 deletions

File tree

multioptpy/Wrapper/mapper.py

Lines changed: 83 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from collections import Counter
2727
from concurrent.futures import (
2828
ProcessPoolExecutor,
29-
as_completed,
3029
TimeoutError as FuturesTimeoutError,
3130
)
3231
try:
@@ -131,7 +130,6 @@ def _autots_worker(config: dict, run_dir: str, workspace: str) -> list[str]:
131130
return sorted(_glob.glob(pattern, recursive=True))
132131

133132

134-
135133
logger = logging.getLogger(__name__)
136134

137135
# Module-level physical constants
@@ -152,14 +150,6 @@ def _autots_worker(config: dict, run_dir: str, workspace: str) -> list[str]:
152150
)
153151

154152

155-
def get_pattern_xyz() -> re.Pattern:
156-
"""Return the compiled XYZ atom-line regex.
157-
158-
Kept for backwards compatibility. Callers are encouraged to reference
159-
``_XYZ_PATTERN`` directly. No recompilation occurs on each call.
160-
"""
161-
return _XYZ_PATTERN
162-
163153

164154
def parse_xyz(filepath: str) -> tuple[list[str], np.ndarray]:
165155
with open(filepath, "r", encoding="utf-8") as fh:
@@ -218,9 +208,6 @@ def parse_xyz(filepath: str) -> tuple[list[str], np.ndarray]:
218208
return symbols, np.array(coords_raw)
219209

220210

221-
def distance_matrix(coords: np.ndarray) -> np.ndarray:
222-
# Using cdist avoids the intermediate (N,N,3) array from manual broadcasting.
223-
return cdist(coords, coords)
224211

225212

226213
# ===========================================================================
@@ -694,7 +681,7 @@ def atom_environments(
694681
the Hungarian algorithm considers only chemically valid permutations.
695682
"""
696683
n = len(symbols)
697-
bonded_si, bonded_sj, ii_idx, jj_idx = self._bonded_pairs(
684+
_, _, ii_idx, jj_idx = self._bonded_pairs(
698685
symbols, coords, return_indices=True
699686
)
700687

@@ -799,9 +786,9 @@ def __init__(self, rng_seed: int = 42) -> None:
799786
self._tasks: dict[int, ExplorationTask] = {}
800787

801788
# _task_counter: monotonically increasing integer incremented on push().
802-
# Using a stable counter instead of id(task) prevents a bug where a
803-
# GC'd task's memory address is reused by a new object, causing
804-
# _tasks.pop() to silently delete a live entry.
789+
# A stable counter prevents a GC'd task's memory address from being
790+
# reused by a new object, which would cause _tasks.pop() to silently
791+
# delete a live entry.
805792
self._task_counter: int = 0
806793

807794
# _heap: min-heap for O(log N) extraction.
@@ -953,7 +940,7 @@ def refresh_priorities(self, ref_e: float | None) -> None:
953940
"""Record the new reference energy.
954941
955942
Actual delta_E recomputation and priority updates are performed lazily
956-
inside pop() (O(1)). The former O(N) full-heap rebuild is removed.
943+
inside pop() (O(1)).
957944
958945
Correctness note: BoltzmannQueue.compute_priority() is monotonically
959946
decreasing in delta_E, so the heap order established at push() time
@@ -1114,14 +1101,6 @@ def has_any_for_node(self, node_id: int) -> bool:
11141101
"""
11151102
return node_id in self._explored_node_ids
11161103

1117-
def contains(self, key: tuple[int, int, int, str]) -> bool:
1118-
"""Return ``True`` if the exact (node_id, atom_i, atom_j, gamma_sign) tuple exists.
1119-
1120-
Convenience wrapper for callers that have already constructed the key
1121-
for other purposes.
1122-
"""
1123-
return key in self._explored
1124-
11251104
def record(self, node_id: int, atom_i: int, atom_j: int, gamma_sign: str) -> None:
11261105
"""Mark the combination as explored and append it to the text file."""
11271106
key = (node_id, atom_i, atom_j, gamma_sign)
@@ -1161,7 +1140,6 @@ def __init__(
11611140
max_pairs: int = 5,
11621141
dist_lower_ang: float = 1.5,
11631142
dist_upper_ang: float = 5.0,
1164-
rng_seed: int = 0,
11651143
covalent_margin: float = 1.2,
11661144
active_atoms: list[int] | None = None,
11671145
include_negative_gamma: bool = False,
@@ -1176,8 +1154,6 @@ def __init__(
11761154
Maximum number of atom pairs sampled per call.
11771155
dist_lower_ang / dist_upper_ang : float
11781156
Distance window [Å] for candidate pair selection.
1179-
rng_seed : int
1180-
NumPy RNG seed for reproducibility.
11811157
covalent_margin : float
11821158
Pairs closer than ``covalent_margin * (r_i + r_j)`` are skipped
11831159
(already covalently bonded).
@@ -1196,7 +1172,6 @@ def __init__(
11961172
self.covalent_margin = covalent_margin
11971173
self.active_atoms = set(active_atoms) if active_atoms is not None else None
11981174
self.include_negative_gamma = include_negative_gamma
1199-
self._rng = np.random.default_rng(rng_seed)
12001175

12011176
def _build_candidates(
12021177
self,
@@ -1259,9 +1234,8 @@ def get_candidate_pairs(
12591234
) -> list[tuple[int, int]]:
12601235
"""Return all valid non-covalent atom pairs without sampling.
12611236
1262-
Unlike :meth:`generate_afir_perturbations`, this method returns the
1263-
full candidate pool so that callers can implement their own sampling
1264-
strategy.
1237+
Returns the full candidate pool so that callers can implement their own
1238+
sampling strategy.
12651239
12661240
Returns
12671241
-------
@@ -1270,38 +1244,6 @@ def get_candidate_pairs(
12701244
"""
12711245
return self._build_candidates(symbols, coords)
12721246

1273-
def generate_afir_perturbations(
1274-
self,
1275-
symbols: list[str],
1276-
coords: np.ndarray,
1277-
) -> list[list[str]]:
1278-
"""Return AFIR parameter lists for AutoTSWorkflow step1.
1279-
1280-
Each entry has the form ``[gamma_str, atom_i_1based, atom_j_1based]``.
1281-
When ``include_negative_gamma`` is ``True``, each selected pair is
1282-
duplicated with a negative gamma value to explore both attractive and
1283-
repulsive directions. Maximum total entries: ``2 * max_pairs``.
1284-
"""
1285-
candidates = self._build_candidates(symbols, coords)
1286-
if not candidates:
1287-
return []
1288-
1289-
n_sel = min(self.max_pairs, len(candidates))
1290-
chosen = self._rng.choice(len(candidates), size=n_sel, replace=False)
1291-
1292-
pos_gamma_str = f"{self.afir_gamma_kJmol:.6g}"
1293-
neg_gamma_str = f"{-self.afir_gamma_kJmol:.6g}"
1294-
1295-
result: list[list[str]] = []
1296-
for idx in chosen:
1297-
i, j = candidates[int(idx)]
1298-
i1, j1 = str(i + 1), str(j + 1)
1299-
result.append([pos_gamma_str, i1, j1])
1300-
if self.include_negative_gamma:
1301-
result.append([neg_gamma_str, i1, j1])
1302-
1303-
return result
1304-
13051247

13061248
# ===========================================================================
13071249
# Section 5 : Graph data model
@@ -2160,9 +2102,10 @@ def __init__(
21602102
temperature_K=float(ms.get("rcmc_temperature_K", temperature_K)),
21612103
reaction_time_s=float(ms.get("rcmc_reaction_time_s", 1.0)),
21622104
start_node_id=ms.get("rcmc_start_node_id", None),
2105+
rng_seed=rng_seed,
21632106
)
21642107
else:
2165-
self.queue = BoltzmannQueue(temperature_K=temperature_K)
2108+
self.queue = BoltzmannQueue(temperature_K=temperature_K, rng_seed=rng_seed)
21662109

21672110
# The same checker instance is used for both EQ and TS comparisons.
21682111
# EQ: _find_or_register_node searches graph.all_nodes()
@@ -2175,7 +2118,6 @@ def __init__(
21752118
dist_upper_ang=dist_upper_ang,
21762119
include_negative_gamma=include_negative_gamma,
21772120
active_atoms=active_atoms,
2178-
rng_seed=rng_seed,
21792121
)
21802122

21812123
self.output_dir = os.path.abspath(output_dir)
@@ -2400,7 +2342,17 @@ def _make_executor() -> ProcessPoolExecutor:
24002342
self._iteration += 1
24012343
self.graph.last_iteration = self._iteration
24022344

2403-
run_dir = self._make_run_dir(task)
2345+
try:
2346+
run_dir = self._make_run_dir(task)
2347+
except Exception as exc:
2348+
logger.error(
2349+
"_run_sequential: failed to create run directory for "
2350+
"iteration %06d: %s — releasing task.",
2351+
self._iteration, exc,
2352+
)
2353+
self.queue.release((task.node_id, tuple(task.afir_params)))
2354+
continue
2355+
24042356
try:
24052357
profile_dirs = self._run_autots(task, run_dir, executor)
24062358
except BrokenProcessPool:
@@ -2412,7 +2364,7 @@ def _make_executor() -> ProcessPoolExecutor:
24122364
"recreating ProcessPoolExecutor and marking iteration %06d as FAILED.",
24132365
self._iteration,
24142366
)
2415-
executor.shutdown(wait=False)
2367+
executor.shutdown(wait=False, cancel_futures=True)
24162368
executor = _make_executor()
24172369
self.queue.release((task.node_id, tuple(task.afir_params)))
24182370
self._append_history(history_log, self._iteration, task, "FAILED")
@@ -2439,10 +2391,10 @@ def _make_executor() -> ProcessPoolExecutor:
24392391
for pdir in profile_dirs:
24402392
self._process_profile(pdir, run_dir)
24412393
# Persist the exploration record only after confirming success
2442-
# (profile processing complete). Placing record() before
2443-
# _process_profile() — the previous order — would mark the task
2444-
# as explored even when _process_profile raises (e.g. disk full
2445-
# in _persist_node_xyz), making it non-retryable on resume.
2394+
# (profile processing complete). Calling record() before
2395+
# _process_profile() would mark the task as explored even when
2396+
# _process_profile raises (e.g. disk full in _persist_node_xyz),
2397+
# making it non-retryable on resume.
24462398
# Must mirror the parallel path in _process_single_result.
24472399
self.explored_log.record(task.node_id, atom_i, atom_j, gamma_sign)
24482400
except Exception as exc:
@@ -2557,9 +2509,8 @@ def _new_executor() -> ProcessPoolExecutor:
25572509
)
25582510

25592511
executor: ProcessPoolExecutor = _new_executor()
2560-
# future → (task, run_dir, iteration, gamma_sign, atom_i, atom_j)
2512+
# future → (task, run_dir, iteration, gamma_sign, atom_i, atom_j, submit_time)
25612513
futures_map: dict = {}
2562-
timed_out = False
25632514
exhausted = False
25642515
pool_broken = False # set when BrokenProcessPool is detected
25652516
pool_rebuild_count = 0
@@ -2637,7 +2588,16 @@ def _try_submit() -> bool:
26372588

26382589
self._iteration += 1
26392590
self.graph.last_iteration = self._iteration
2640-
run_dir = self._make_run_dir(task, iteration=self._iteration)
2591+
try:
2592+
run_dir = self._make_run_dir(task, iteration=self._iteration)
2593+
except Exception as exc:
2594+
logger.error(
2595+
"_try_submit: failed to create run directory for "
2596+
"iteration %06d: %s — releasing task.",
2597+
self._iteration, exc,
2598+
)
2599+
self.queue.release((task.node_id, tuple(task.afir_params)))
2600+
continue
26412601
workspace = os.path.join(run_dir, "autots_workspace")
26422602
config = self._make_autots_config(task, workspace)
26432603
try:
@@ -2658,6 +2618,14 @@ def _try_submit() -> bool:
26582618
)
26592619
self.queue.release((task.node_id, tuple(task.afir_params)))
26602620
pool_broken = True
2621+
# Record the failed iteration so history and run_info.json
2622+
# remain consistent even when submit itself raises.
2623+
self._append_history(history_log, self._iteration, task, "FAILED")
2624+
self._finalize_iteration(
2625+
run_dir, task, "FAILED", [],
2626+
os.path.join(self.output_dir, "queue_priority.log"),
2627+
self._iteration,
2628+
)
26612629
return False
26622630

26632631
futures_map[future] = (
@@ -2784,6 +2752,7 @@ def _rebuild_pool() -> None:
27842752
"reached — stopping exploration.",
27852753
_MAX_POOL_REBUILDS,
27862754
)
2755+
_drain_broken_futures() # empties futures_map before exit
27872756
break
27882757
_drain_broken_futures() # empties futures_map
27892758
_rebuild_pool() # replaces executor, clears pool_broken
@@ -2869,7 +2838,7 @@ def _rebuild_pool() -> None:
28692838
_try_submit()
28702839

28712840
finally:
2872-
executor.shutdown(wait=not timed_out, cancel_futures=timed_out)
2841+
executor.shutdown(wait=True, cancel_futures=False)
28732842
try:
28742843
self.graph.save(self.graph_json_path)
28752844
self._write_priority_log(priority_log)
@@ -2898,26 +2867,39 @@ def _process_single_result(
28982867
outcome to unconditionally free the key from _in_flight.
28992868
"""
29002869
self.graph.last_iteration = max(self.graph.last_iteration, iteration)
2901-
self._append_history(history_log, iteration, task, status)
2870+
effective_status = status
29022871

2903-
if status == "DONE":
2904-
logger.info(
2905-
"Iter %06d (batch): _run_autots returned %d profile director%s.",
2906-
iteration, len(profile_dirs),
2907-
"y" if len(profile_dirs) == 1 else "ies",
2872+
try:
2873+
if status == "DONE":
2874+
logger.info(
2875+
"Iter %06d (batch): _run_autots returned %d profile director%s.",
2876+
iteration, len(profile_dirs),
2877+
"y" if len(profile_dirs) == 1 else "ies",
2878+
)
2879+
for pdir in profile_dirs:
2880+
self._process_profile(pdir, run_dir)
2881+
# Persist only after confirming success (FAILED / TIMEOUT remain retryable)
2882+
self.explored_log.record(task.node_id, atom_i, atom_j, gamma_sign)
2883+
except Exception as exc:
2884+
logger.error(
2885+
"_process_single_result: _process_profile failed for run %s: %s — "
2886+
"marking FAILED; task remains retryable on resume.",
2887+
run_dir, exc,
29082888
)
2909-
for pdir in profile_dirs:
2910-
self._process_profile(pdir, run_dir)
2911-
# Persist only after confirming success (FAILED / TIMEOUT remain retryable)
2912-
self.explored_log.record(task.node_id, atom_i, atom_j, gamma_sign)
2913-
2914-
# Release the in-flight lock regardless of outcome
2915-
self.queue.release((task.node_id, tuple(task.afir_params)))
2916-
if hasattr(self.queue, "set_graph"):
2917-
self.queue.set_graph(self.graph)
2918-
2889+
effective_status = "FAILED"
2890+
finally:
2891+
# Release the in-flight lock regardless of outcome.
2892+
# Must be in a finally block so that exceptions inside _process_profile
2893+
# (e.g. disk full in _persist_node_xyz) do not leave the key stuck in
2894+
# _in_flight permanently — which would silently block the pair from ever
2895+
# being re-queued even on resume.
2896+
self.queue.release((task.node_id, tuple(task.afir_params)))
2897+
if hasattr(self.queue, "set_graph"):
2898+
self.queue.set_graph(self.graph)
2899+
2900+
self._append_history(history_log, iteration, task, effective_status)
29192901
self._finalize_iteration(
2920-
run_dir, task, status, profile_dirs,
2902+
run_dir, task, effective_status, profile_dirs,
29212903
os.path.join(self.output_dir, "queue_priority.log"),
29222904
iteration,
29232905
)
@@ -3302,7 +3284,6 @@ def _run_autots(
33023284
f"_run_autots: worker exceeded hard timeout of "
33033285
f"{self.worker_timeout_s}s — process was force-killed."
33043286
)
3305-
# Shutdown is the caller's responsibility (see _run_sequential finally clause).
33063287

33073288
# ------------------------------------------------------------------ #
33083289
# Energy back-fill #
@@ -3503,14 +3484,14 @@ def _process_profile(self, profile_dir: str, run_dir: str) -> None:
35033484
ts_energy: float | None = result["ts_energy"]
35043485

35053486
# ── Step 1: parse new TS geometry ─────────────────────────────────
3506-
# Failure here must NOT skip EQ endpoint registration (Step 2).
3507-
# A missing or unreadable TS file means we cannot add a TSEdge, but
3508-
# the IRC endpoints are still valid EQ structures worth keeping.
3487+
# Parsing failure must NOT skip EQ endpoint registration (Step 2).
3488+
# An unreadable TS file means we cannot add a TSEdge, but the IRC
3489+
# endpoints are still valid EQ structures worth keeping.
35093490
ts_sym: list[str] = []
35103491
ts_coords: np.ndarray = np.empty((0, 3), dtype=float)
3511-
ts_xyz = result.get("ts_xyz_file", "") or ""
3492+
ts_xyz = result["ts_xyz_file"]
35123493
ts_geom_ok = False
3513-
if ts_xyz and os.path.isfile(ts_xyz):
3494+
if ts_xyz:
35143495
try:
35153496
ts_sym, ts_coords = parse_xyz(ts_xyz)
35163497
ts_geom_ok = True
@@ -3523,12 +3504,6 @@ def _process_profile(self, profile_dir: str, run_dir: str) -> None:
35233504
"TSEdge will not be added, but EQ endpoints will still be registered.",
35243505
ts_xyz, exc,
35253506
)
3526-
else:
3527-
logger.warning(
3528-
"_process_profile: TS XYZ not found (profile_dir=%s, file=%r) — "
3529-
"TSEdge will not be added, but EQ endpoints will still be registered.",
3530-
profile_dir, ts_xyz,
3531-
)
35323507

35333508
# ── Step 2: register EQ endpoint nodes ───────────────────────────
35343509
node_id_1 = self._find_or_register_node(
@@ -3662,9 +3637,8 @@ def _enqueue_perturbations(self, node: EQNode, force_add: bool = False) -> None:
36623637
nid = node.node_id
36633638

36643639
# ── Filter out already-explored and already-queued pairs before sampling ─
3665-
# In the previous implementation, generate_afir_perturbations sampled
3666-
# randomly and then checked for exclusions, which could yield zero valid
3667-
# tasks when all selected pairs were already explored.
3640+
# Checking exclusions before sampling ensures that random selection
3641+
# only draws from genuinely unexplored pairs.
36683642
unexplored = [
36693643
(i0, j0) for i0, j0 in all_candidates
36703644
if any(

0 commit comments

Comments
 (0)