@@ -3172,7 +3172,7 @@ def _run_autots(
31723172 self ,
31733173 task : ExplorationTask ,
31743174 run_dir : str ,
3175- executor : ProcessPoolExecutor , # ADDED: caller-owned executor passed in
3175+ executor : ProcessPoolExecutor ,
31763176 ) -> list [str ]:
31773177 """Run AutoTSWorkflow in an isolated spawned subprocess.
31783178
@@ -3189,9 +3189,10 @@ def _run_autots(
31893189 future.result(timeout=worker_timeout_s) raises concurrent.futures.
31903190 TimeoutError when the limit is exceeded. The handler cancels
31913191 pending work and force-kills the live worker via executor._processes
3192- (same approach as _run_parallel_rolling), then raises RuntimeError.
3193- Executor shutdown is left to the caller so that a single timeout does
3194- not tear down the shared executor prematurely.
3192+ (same approach as _run_parallel_rolling), then raises
3193+ BrokenProcessPool so that the caller's except BrokenProcessPool
3194+ branch fires immediately and rebuilds the executor without wasting
3195+ one extra iteration.
31953196
31963197 Return value:
31973198 _autots_worker returns a sorted list of Step-4 profile directories;
@@ -3214,56 +3215,50 @@ def _run_autots(
32143215 except Exception as exc :
32153216 logger .warning ("_run_autots: could not write config_used.json: %s" , exc )
32163217
3217- # REMOVED: executor creation block.
3218- # ProcessPoolExecutor is now created once in _run_sequential and passed in,
3219- # eliminating the per-iteration spawn/join overhead.
3220- # max_tasks_per_child=1 is still set on the shared executor (see
3221- # _run_sequential) so each task still runs in a fresh child process,
3222- # keeping os.chdir() isolation intact.
3223-
32243218 future = executor .submit (_autots_worker , config , run_dir , workspace )
32253219 try :
32263220 return future .result (timeout = self .worker_timeout_s )
3227- except (TimeoutError , FuturesTimeoutError ):
3221+ except (TimeoutError , FuturesTimeoutError ):
32283222 logger .error (
32293223 "_run_autots: worker exceeded hard timeout of %ds — "
32303224 "force-killing worker process." ,
32313225 self .worker_timeout_s ,
32323226 )
32333227 future .cancel ()
3234- # No public API exposes individual worker handles; use the
3235- # private _processes dict (same pattern as _run_parallel_rolling).
32363228 worker_procs = getattr (executor , "_processes" , {})
32373229 for pid , proc in list (worker_procs .items ()):
32383230 if proc .is_alive ():
32393231 logger .warning (
32403232 "_run_autots: force-killing worker pid=%d" , pid
32413233 )
32423234 proc .kill ()
3243- raise RuntimeError (
3235+ # Raise BrokenProcessPool (not RuntimeError) so that _run_sequential's
3236+ # except BrokenProcessPool branch fires immediately and rebuilds the
3237+ # executor in the same iteration, avoiding one wasted no-op iteration.
3238+ raise BrokenProcessPool (
32443239 f"_run_autots: worker exceeded hard timeout of "
3245- f"{ self .worker_timeout_s } s."
3240+ f"{ self .worker_timeout_s } s — process was force-killed ."
32463241 )
3247- # REMOVED: finally block with executor.shutdown().
3248- # Shutdown is now the caller's responsibility (see _run_sequential finally clause).
3242+ # Shutdown is the caller's responsibility (see _run_sequential finally clause).
32493243
32503244 # ------------------------------------------------------------------ #
32513245 # Energy back-fill #
32523246 # ------------------------------------------------------------------ #
32533247
32543248 def _flush_node_energy_updates (self ) -> None :
32553249 """Apply all accumulated node energy back-fills in bulk.
3256-
3250+
32573251 Design rules:
32583252 * Called once per iteration, immediately before ``graph.save()``.
32593253 * Updates only when the existing value is ``None`` and the new value
3260- is not None.
3254+ is not None.
32613255 * TSEdge barrier values are never modified (IRC-measured values are
3262- preserved).
3256+ preserved).
32633257 """
32643258 if not self ._pending_node_updates :
32653259 return
3266-
3260+
3261+ actually_changed = False
32673262 for node_id , updates in self ._pending_node_updates .items ():
32683263 node = self .graph .get_node (node_id )
32693264 if node is None :
@@ -3272,27 +3267,30 @@ def _flush_node_energy_updates(self) -> None:
32723267 node_id ,
32733268 )
32743269 continue
3275-
3270+
32763271 changed : list [str ] = []
32773272 if updates .energy is not None and node .energy is None :
32783273 node .energy = updates .energy
32793274 changed .append (f"energy={ updates .energy :.10f} Ha" )
3280-
3275+
32813276 if updates .free_energy is not None and node .free_energy is None :
32823277 node .free_energy = updates .free_energy
32833278 changed .append (f"free_energy={ updates .free_energy :.10f} Ha" )
3284-
3279+
32853280 if changed :
3281+ actually_changed = True
32863282 logger .info (
32873283 "_flush_node_energy_updates: EQ%d updated — %s" ,
32883284 node_id , " " .join (changed ),
32893285 )
3290-
3286+
32913287 self ._pending_node_updates .clear ()
3292- # Invalidate the cache because node energies have changed; the index
3293- # will be rebuilt on the next window query.
3294- self .graph .invalidate_energy_cache ()
3295-
3288+ # Invalidate only when at least one node was actually modified.
3289+ # Skipping the call when nothing changed avoids a needless O(N log N)
3290+ # index rebuild on the next window query.
3291+ if actually_changed :
3292+ self .graph .invalidate_energy_cache ()
3293+
32963294 # ------------------------------------------------------------------ #
32973295 # Node identity and registration #
32983296 # ------------------------------------------------------------------ #
0 commit comments