From 09ef6b5e87d45a64022d4a842ff78ea5ea199335 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Mon, 27 Apr 2026 15:37:43 -0400 Subject: [PATCH 01/11] fix: replace worker.logger with centralized logging in task launch process --- ipsframework/services.py | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 711dfff5..1387eef9 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -89,15 +89,19 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a :param working_dir: The working directory in which to run this task :returns: The task name and the return value from running the binary. """ + import logging from dask.distributed import get_worker # pylint: disable=import-outside-toplevel + # We use Client.forward_logging() to handle these log messages. + log = logging.getLogger('launch') + worker = get_worker() if not hasattr(worker, 'lock'): worker.lock = threading.Lock() worker_name = ''.join(c for c in worker.name if c.isalnum()) - worker.logger.info(f'Launching task {task_name} with worker {worker_name} in {working_dir}') + log.info(f'Launching task {task_name} with worker {worker_name} in {working_dir}') start_time = time.time() os.chdir(working_dir) @@ -106,10 +110,10 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a try: event_logfile = keywords['worker_event_logfile'].format(worker_name) except (KeyError, AttributeError): - worker.logger.warning('No worker_event_logfile specified, using stdout for logging') + log.warning('No worker_event_logfile specified, using stdout for logging') else: worker_event_log = open(event_logfile, 'a') - worker.logger.info(f'Worker event log file: {event_logfile}') + log.info(f'Worker event log file: {event_logfile}') ret_val = None if isinstance(binary, str): @@ -117,41 +121,41 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a try: log_filename = keywords['logfile'] except KeyError: - worker.logger.info('No logfile specified, using stdout for task output') + log.info('No logfile specified, using stdout for task output') else: task_stdout = open(log_filename, 'w') - worker.logger.info(f'Task output log file: {log_filename}') + log.info(f'Task output log file: {log_filename}') task_stderr = subprocess.STDOUT try: err_filename = keywords['errfile'] except KeyError: - worker.logger.info('No errfile specified, using STDOUT for task errors') + log.info('No errfile specified, using STDOUT for task errors') else: try: task_stderr = open(err_filename, 'w') except OSError: - worker.logger.info(f'Could not open errfile {err_filename}, using STDOUT for task errors') + log.info(f'Could not open errfile {err_filename}, using STDOUT for task errors') task_stderr = subprocess.STDOUT else: - worker.logger.info(f'Task error log file: {err_filename}') + log.info(f'Task error log file: {err_filename}') task_env = keywords.get('task_env', {}) new_env = os.environ.copy() new_env.update(task_env) if 'HWLOC_XMLFILE' in new_env: - worker.logger.debug('Removing HWLOC_XMLFILE from task environment') + log.debug('Removing HWLOC_XMLFILE from task environment') del new_env['HWLOC_XMLFILE'] # Check that the DVM environment variables are set. if hasattr(worker, 'dvm_uri_file'): dvm_uri_file = Path(worker.dvm_uri_file) if not dvm_uri_file.exists(): - worker.logger.error(f'DVM URI file {dvm_uri_file} does not exist') + log.error(f'DVM URI file {dvm_uri_file} does not exist') print(f'DVM URI file {dvm_uri_file} does not exist', flush=True) else: - worker.logger.debug(f'Using DVM URI file: {dvm_uri_file}') + log.debug(f'Using DVM URI file: {dvm_uri_file}') print(f'Using DVM URI file: {dvm_uri_file}', flush=True) # PMIX_SERVER_URI41 is used by prun to figure out how to talk to the DVM @@ -160,13 +164,13 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a # in some HPC environments to ensure the output appears in the logs. if task_env is not None and task_env != {}: if 'PMIX_SERVER_URI41' in task_env: - worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 " + log.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " f"{task_env['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in task_' f'env to {task_env["PMIX_SERVER_URI41"]}', flush=True) if 'PMIX_SERVER_URI41' in os.environ: - worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 set " + log.debug(f"DVM environment variable PMIX_SERVER_URI41 set " f"in os.environ to " f"{os.environ['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ ' @@ -176,7 +180,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a cmd = f'{binary} {" ".join(map(str, args))}' - worker.logger.debug(f'Launching task {task_name} with command: {cmd}') + log.debug(f'Launching task {task_name} with command: {cmd}') with worker.lock: print( @@ -203,7 +207,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a ), file=worker_event_log, ) - worker.logger.error(f'Failed to launch task {task_name} with command {cmd}: {e}') + log.error(f'Failed to launch task {task_name} with command {cmd}: {e}') raise try: @@ -231,7 +235,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a file=worker_event_log, ) os.killpg(process.pid, signal.SIGKILL) - worker.logger.error(f'Task {task_name} with command {cmd} timed out after {timeout}s') + log.error(f'Task {task_name} with command {cmd} timed out after {timeout}s') ret_val = -1 except Exception as e: with worker.lock: @@ -240,7 +244,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a {'eventType': 'IPS_TASK_END', 'event_time': time.time(), 'comment': f'task_name = {task_name} Exception when calling {binary!s}: {e}'} ), ) - worker.logger.error(f'Task {task_name} with command {cmd} failed with {e}') + log.error(f'Task {task_name} with command {cmd} failed with {e}') else: with worker.lock: print( @@ -272,7 +276,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a file=worker_event_log, ) - worker.logger.info(f'Task {task_name} finished with return value: {ret_val}') + log.info(f'Task {task_name} finished with return value: {ret_val}') return task_name, ret_val From 5209f7f28b88c7050340a3f8f449cfd675e8a5d3 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Mon, 27 Apr 2026 16:37:44 -0400 Subject: [PATCH 02/11] fix: improve task logging and error handling in Dask task launch --- ipsframework/services.py | 241 ++++++++++++++++++++------------------- 1 file changed, 122 insertions(+), 119 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 1387eef9..0105cded 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -39,6 +39,7 @@ rich.traceback.install(show_locals=True) from configobj import ConfigObj +from dask import get_current_task from distributed import Client, Worker, WorkerPlugin from ipsframework import ipsutil, messages @@ -62,28 +63,29 @@ class RunningTask(NamedTuple): args: list[str] -def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *args, **keywords): - """This is used by - :meth:`TaskPool.submit_dask_tasks` as the +def launch(binary: Any, + task_name: str, + working_dir: Union[str, os.PathLike], + *args, **kwargs): + """ This is used by :meth:`TaskPool.submit_dask_tasks` as the input to :meth:`dask.distributed.Client.submit`. - Valid keywords: - * `worker_event_logfile` - where JSON log messages are written - * `logfile` - where the task output is written; if not specified, STDOUT used - * `errfile` - where the task error output is written; if not specified, STDOUT used + Valid kwargs: + * `logfile` - where the task output is written; if not specified, + STDOUT used + * `errfile` - where the task error output is written; if not specified, + STDOUT used * `task_env` - A dictionary of environment variables to set * `timeout` - The timeout in seconds for the task to complete. - * `cpus_per_proc` - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. - * `oversubscribe` - If `True`, then the number of processes can exceed the number of cores on the node. Default is `False`. + * `cpus_per_proc` - The number of cpus per process to use for the task. + This implies that the DVMPlugin has set up a DVM daemon for this node. + * `oversubscribe` - If `True`, then the number of processes can exceed the + number of cores on the node. Default is `False`. If the worker has the attribute `dvm_uri_file`, then we are running with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. - If the worker doesn't have a `lock` attribute, then we create one by - assigning a threading lock to it. This is used to ensure that - the worker's event log is written to in a thread-safe manner. - :param binary: The binary to launch. Either a string or a class. :param task_name: The name of the task. :param working_dir: The working directory in which to run this task @@ -92,55 +94,50 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a import logging from dask.distributed import get_worker # pylint: disable=import-outside-toplevel - # We use Client.forward_logging() to handle these log messages. + # Later, we use Client.forward_logging() to handle these log messages. log = logging.getLogger('launch') worker = get_worker() - if not hasattr(worker, 'lock'): - worker.lock = threading.Lock() - - worker_name = ''.join(c for c in worker.name if c.isalnum()) + task_key = get_current_task() - log.info(f'Launching task {task_name} with worker {worker_name} in {working_dir}') + log.info(f'Launching task {task_name} with id {task_key!s} and ' + f'worker {worker.name!s} in {working_dir}') start_time = time.time() os.chdir(working_dir) - worker_event_log = sys.stdout - try: - event_logfile = keywords['worker_event_logfile'].format(worker_name) - except (KeyError, AttributeError): - log.warning('No worker_event_logfile specified, using stdout for logging') - else: - worker_event_log = open(event_logfile, 'a') - log.info(f'Worker event log file: {event_logfile}') - ret_val = None if isinstance(binary, str): - task_stdout = sys.stdout + # This is presumably an external binary executable to be executed + # via a subprocess.Popen() + + # Do we write the Popen stdout to sys.stdout or to a file? + subprocess_stdout = sys.stdout try: - log_filename = keywords['logfile'] + log_filename = kwargs['logfile'] except KeyError: log.info('No logfile specified, using stdout for task output') else: - task_stdout = open(log_filename, 'w') + subprocess_stdout = open(log_filename, 'w') log.info(f'Task output log file: {log_filename}') + # Repeat the same for stderr task_stderr = subprocess.STDOUT try: - err_filename = keywords['errfile'] + subprocess_stderr = kwargs['errfile'] except KeyError: log.info('No errfile specified, using STDOUT for task errors') else: try: - task_stderr = open(err_filename, 'w') + task_stderr = open(subprocess_stderr, 'w') except OSError: - log.info(f'Could not open errfile {err_filename}, using STDOUT for task errors') + log.info(f'Could not open errfile {subprocess_stderr}, ' + f'using STDOUT for task errors') task_stderr = subprocess.STDOUT else: - log.info(f'Task error log file: {err_filename}') + log.info(f'Task error log file: {subprocess_stderr}') - task_env = keywords.get('task_env', {}) + task_env = kwargs.get('task_env', {}) new_env = os.environ.copy() new_env.update(task_env) @@ -153,10 +150,10 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a dvm_uri_file = Path(worker.dvm_uri_file) if not dvm_uri_file.exists(): log.error(f'DVM URI file {dvm_uri_file} does not exist') - print(f'DVM URI file {dvm_uri_file} does not exist', flush=True) + # print(f'DVM URI file {dvm_uri_file} does not exist', flush=True) else: log.debug(f'Using DVM URI file: {dvm_uri_file}') - print(f'Using DVM URI file: {dvm_uri_file}', flush=True) + # print(f'Using DVM URI file: {dvm_uri_file}', flush=True) # PMIX_SERVER_URI41 is used by prun to figure out how to talk to the DVM # It can be defined in `task_env` or in `os.environ`, so we look in @@ -167,114 +164,120 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a log.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " f"{task_env['PMIX_SERVER_URI41']}") - print(f'DVM environment variable PMIX_SERVER_URI41 set in task_' - f'env to {task_env["PMIX_SERVER_URI41"]}', flush=True) + # print(f'DVM environment variable PMIX_SERVER_URI41 set in task_' + # f'env to {task_env["PMIX_SERVER_URI41"]}', flush=True) if 'PMIX_SERVER_URI41' in os.environ: log.debug(f"DVM environment variable PMIX_SERVER_URI41 set " f"in os.environ to " f"{os.environ['PMIX_SERVER_URI41']}") - print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ ' - f'to {os.environ["PMIX_SERVER_URI41"]}', flush=True) + # print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ ' + # f'to {os.environ["PMIX_SERVER_URI41"]}', flush=True) - timeout = float(keywords.get('timeout', 1.0e9)) + timeout = float(kwargs.get('timeout', 1.0e9)) cmd = f'{binary} {" ".join(map(str, args))}' log.debug(f'Launching task {task_name} with command: {cmd}') - with worker.lock: - print( - json.dumps({'eventType': 'IPS_LAUNCH_DASK_TASK', 'event_time': time.time(), 'comment': f'task_name = {task_name}, Target = {cmd}'}), - file=worker_event_log, - ) + worker.log_event('ips', + { + 'eventType' : 'IPS_LAUNCH_DASK_TASK', + 'event_time': start_time, + 'comment' : f'task_name = {task_name}, ' + f'Task key = {task_key!s}, ' + f'Target = {cmd}' + }) cmd_lst = cmd.split() try: - process = subprocess.Popen(cmd_lst, stdout=task_stdout, + process = subprocess.Popen(cmd_lst, stdout=subprocess_stdout, stderr=task_stderr, cwd=working_dir, preexec_fn=os.setsid, env=new_env) # noqa: PLW1509 (TODO: look into this to potentially avoid deadlocks) except Exception as e: - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': time.time(), - 'comment': f'task_name = {task_name} Exception when calling {binary!s}: {e}', - 'operation': ' '.join(map(str, args)), - } - ), - file=worker_event_log, - ) - log.error(f'Failed to launch task {task_name} with command {cmd}: {e}') + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{binary!s}: {e!s}', + 'operation' : ' '.join(map(str, args)), + }) + log.error(f'Failed to launch task {task_name} with ' + f'command {cmd}: {e}') raise try: ret_val = process.wait(timeout) finish_time = time.time() - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': finish_time, - 'comment': f'task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s', - 'start_time': start_time, - 'elapsed_time': finish_time - start_time, - 'target': binary, - 'operation': ' '.join(map(str, args)), - } - ), - file=worker_event_log, - ) + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'comment' : f'task_name = ' + f'{task_name},' + f' elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : binary, + 'operation' : ' '.join(map(str, args)), + }) + except subprocess.TimeoutExpired: - with worker.lock: - print( - json.dumps({'eventType': 'IPS_TASK_END', 'event_time': time.time(), 'comment': f'task_name = {task_name}, timed-out after {timeout}s'}), - file=worker_event_log, - ) - os.killpg(process.pid, signal.SIGKILL) - log.error(f'Task {task_name} with command {cmd} timed out after {timeout}s') + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'comment' : f'task_name = {task_name}, ' + f'timed-out after ' + f'{timeout}s'}) + process.kill() + log.error(f'Task {task_name} with command {cmd} timed out ' + f'after {timeout}s') ret_val = -1 except Exception as e: - with worker.lock: - print( - json.dumps( - {'eventType': 'IPS_TASK_END', 'event_time': time.time(), 'comment': f'task_name = {task_name} Exception when calling {binary!s}: {e}'} - ), - ) - log.error(f'Task {task_name} with command {cmd} failed with {e}') - else: - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_LAUNCH_DASK_TASK', - 'event_time': time.time(), - 'comment': f'task_name = {task_name}, Target = {binary.__name__}({",".join(map(str, args))})', - } - ), - file=worker_event_log, - ) + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{binary!s}: {e!s}'}) + log.error(f'Task {task_name} with command {cmd} failed with {e!s}') + elif isinstance(binary, Callable): + # binary not a string, but is a python callable, so we call it directly + # invoke it the given *args + worker.log_event('ips', + { + 'eventType' : 'IPS_LAUNCH_DASK_TASK', + 'event_time': time.time(), + 'comment' : f'task_name = {task_name}, ' + f'Target = {binary.__name__}(' + f'{",".join(map(str, args))})', + }) + ret_val = binary(*args) finish_time = time.time() - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': finish_time, - 'comment': f'task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s', - 'start_time': start_time, - 'elapsed_time': finish_time - start_time, - 'target': binary.__name__, - 'return_value': ret_val, - 'operation': f'({",".join(map(str, args))})', - } - ), - file=worker_event_log, - ) + + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'comment' : f'task_name = {task_name}, ' + f'elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : binary.__name__, + 'return_value': ret_val, + 'operation' : f'({",".join(map(str, + args))})', + }) + else: + raise RuntimeError(f'Binary argument {binary!s} is not a string or ' + f'callable, cannot launch task {task_name}') log.info(f'Task {task_name} finished with return value: {ret_val}') From d4c6019a7d754a608b15b992780d1c5a979177b0 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Mon, 27 Apr 2026 16:47:54 -0400 Subject: [PATCH 03/11] fix: add Dask event processing for IPS monitor events --- ipsframework/services.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ipsframework/services.py b/ipsframework/services.py index 0105cded..51d46eb3 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2947,6 +2947,25 @@ def add_task(self, task_name: str, nproc: int, working_dir: str, binary: str, *a self.serial_pool = self.serial_pool and (nproc == 1) self.queued_tasks[task_name] = Task(task_name, nproc, working_dir, binary_fullpath, *args, **keywords['keywords']) + def _process_dask_event(self, event): + """ This will create an IPS monitor event from a Dask event + + These events will have been created in `launch()`. As they are + created, this callback will be invoked to send the corresponding + IPS monitor event. + + This callback is registered in `submit_dask_tasks()`. + + :param event: Dask event tuple of (timestamp, dict) + """ + timestamp, message = event + + self.services.debug(f'Processing dask event: {message!s}, ' + f'timestamp: {timestamp!s}') + + self.services._send_monitor_event(**message) + + def submit_dask_tasks( self, block=True, @@ -3176,6 +3195,10 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi # logger so that it can be captured by the services. self.dask_client.forward_logging() + # Register callback to handle 'ips' events sent by + # launch() that will be converted to IPS monitor events. + self.dask_client.subscribe_topic('ips', self._process_dask_event) + if dask_worker_plugin is not None: # TODO But what if there is more than one worker plugin? # TODO And what about scheduler plugins? From b64ca442b59c4871fe23074dd8d63821604cf832 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Mon, 27 Apr 2026 16:53:58 -0400 Subject: [PATCH 04/11] fix: unregister Dask topic handler and clean up worker event logging --- ipsframework/services.py | 31 +------------------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 51d46eb3..c999aa41 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -3345,6 +3345,7 @@ def _shutdown_dask(self): if self.dask_client is not None: # Shutdown handles ending client, scheduler, and workers + self.dask_client.unsubscribe_topic('ips') # unregister handler self.dask_client.shutdown() # TODO a more gentle way to shutdown: @@ -3434,36 +3435,6 @@ def get_dask_finished_tasks_status(self): self._shutdown_dask() self.services.debug(f'get_dask_finished_tasks_status: after _shutdown_dask()') - if self.worker_event_logfile is not None: - self.services.debug(f'get_dask_finished_tasks_status: worker_event_logfile: ' - f'{self.worker_event_logfile!s}') - try: - events = [] - for worker in worker_names: - filename = self.worker_event_logfile.format(worker) - try: - lines = open(filename).readlines() - except IOError: - self.services.exception('Error opening dask worker log file: %s', filename) - else: - # convert to dict and sort by event_time - for line in lines: - try: - events.append(json.loads(line.strip())) - except json.decoder.JSONDecodeError: - self.services.exception('Error reading line %s from dask worker log file: %s', line.strip(), filename) - - events.sort(key=itemgetter('event_time')) - for event in events: - self.services._send_monitor_event(**event) - except Exception as e: - # If it fails for any other reason, make sure we can continue - self.services.exception('Error while reading dask worker log files: %s', str(e)) - else: - for worker in worker_names: - if os.path.isfile(self.worker_event_logfile.format(worker)): - os.remove(self.worker_event_logfile.format(worker)) - self.finished_tasks = {} self.active_tasks = {} self.services.wait_task(self.dask_workers_tid) From 8b9260245882d613795a0c5b231f7887a6f8c424 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 15:33:39 -0400 Subject: [PATCH 05/11] fix: rename "binary" parameter to "executable" in launch function for clarity --- ipsframework/services.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index c999aa41..1b37a978 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -63,7 +63,7 @@ class RunningTask(NamedTuple): args: list[str] -def launch(binary: Any, +def launch(executable: Any, task_name: str, working_dir: Union[str, os.PathLike], *args, **kwargs): @@ -86,7 +86,7 @@ def launch(binary: Any, with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. - :param binary: The binary to launch. Either a string or a class. + :param executable: The binary to launch. Either a string or a class. :param task_name: The name of the task. :param working_dir: The working directory in which to run this task :returns: The task name and the return value from running the binary. @@ -107,7 +107,7 @@ def launch(binary: Any, os.chdir(working_dir) ret_val = None - if isinstance(binary, str): + if isinstance(executable, str): # This is presumably an external binary executable to be executed # via a subprocess.Popen() @@ -175,7 +175,7 @@ def launch(binary: Any, timeout = float(kwargs.get('timeout', 1.0e9)) - cmd = f'{binary} {" ".join(map(str, args))}' + cmd = f'{executable} {" ".join(map(str, args))}' log.debug(f'Launching task {task_name} with command: {cmd}') @@ -201,7 +201,7 @@ def launch(binary: Any, 'event_time': time.time(), 'comment' : f'task_name = {task_name} ' f'Exception when calling ' - f'{binary!s}: {e!s}', + f'{executable!s}: {e!s}', 'operation' : ' '.join(map(str, args)), }) log.error(f'Failed to launch task {task_name} with ' @@ -221,7 +221,7 @@ def launch(binary: Any, f'{finish_time - start_time:.2f}s', 'start_time' : start_time, 'elapsed_time': finish_time - start_time, - 'target' : binary, + 'target' : executable, 'operation' : ' '.join(map(str, args)), }) @@ -244,9 +244,9 @@ def launch(binary: Any, 'event_time': time.time(), 'comment' : f'task_name = {task_name} ' f'Exception when calling ' - f'{binary!s}: {e!s}'}) + f'{executable!s}: {e!s}'}) log.error(f'Task {task_name} with command {cmd} failed with {e!s}') - elif isinstance(binary, Callable): + elif isinstance(executable, Callable): # binary not a string, but is a python callable, so we call it directly # invoke it the given *args worker.log_event('ips', @@ -254,11 +254,11 @@ def launch(binary: Any, 'eventType' : 'IPS_LAUNCH_DASK_TASK', 'event_time': time.time(), 'comment' : f'task_name = {task_name}, ' - f'Target = {binary.__name__}(' + f'Target = {executable.__name__}(' f'{",".join(map(str, args))})', }) - ret_val = binary(*args) + ret_val = executable(*args) finish_time = time.time() worker.log_event('ips', @@ -270,13 +270,13 @@ def launch(binary: Any, f'{finish_time - start_time:.2f}s', 'start_time' : start_time, 'elapsed_time': finish_time - start_time, - 'target' : binary.__name__, + 'target' : executable.__name__, 'return_value': ret_val, 'operation' : f'({",".join(map(str, args))})', }) else: - raise RuntimeError(f'Binary argument {binary!s} is not a string or ' + raise RuntimeError(f'Binary argument {executable!s} is not a string or ' f'callable, cannot launch task {task_name}') log.info(f'Task {task_name} finished with return value: {ret_val}') From f2a5ea0b34aff563c3c52c0995339e6f0457fc08 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 15:41:14 -0400 Subject: [PATCH 06/11] fix: update task key retrieval to use worker method --- ipsframework/services.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 1b37a978..44d70c70 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -39,7 +39,6 @@ rich.traceback.install(show_locals=True) from configobj import ConfigObj -from dask import get_current_task from distributed import Client, Worker, WorkerPlugin from ipsframework import ipsutil, messages @@ -98,7 +97,7 @@ def launch(executable: Any, log = logging.getLogger('launch') worker = get_worker() - task_key = get_current_task() + task_key = worker.get_current_task() log.info(f'Launching task {task_name} with id {task_key!s} and ' f'worker {worker.name!s} in {working_dir}') From 63e845af67ee51f65dd4d69866696cc83334f8d7 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 16:07:18 -0400 Subject: [PATCH 07/11] fix: update task key retrieval to use worker method --- ipsframework/services.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ipsframework/services.py b/ipsframework/services.py index 44d70c70..02b4e565 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2961,6 +2961,8 @@ def _process_dask_event(self, event): self.services.debug(f'Processing dask event: {message!s}, ' f'timestamp: {timestamp!s}') + print(f'Processing dask event: {message!s}, ' + f'timestamp: {timestamp!s}') self.services._send_monitor_event(**message) From c21a32b0097ad5fb73c28d2bf9a87d23391e35a4 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 16:14:45 -0400 Subject: [PATCH 08/11] fix: remove 'worker' key from Dask event message to prevent errors --- ipsframework/services.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 02b4e565..e97ce60d 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2961,8 +2961,13 @@ def _process_dask_event(self, event): self.services.debug(f'Processing dask event: {message!s}, ' f'timestamp: {timestamp!s}') - print(f'Processing dask event: {message!s}, ' - f'timestamp: {timestamp!s}') + + if 'worker' in message: + # Sneaky Dask will surreptitiously add 'worker', which is + # not mentioned in the API documentation. If we don't remove + # this, _send_monitor_event() will fail because it doesn't expect + # this argument. + del message['worker'] self.services._send_monitor_event(**message) From 4025920a00e7fbb1b9dccab7b01f69878404a317 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 16:36:51 -0400 Subject: [PATCH 09/11] fix: add state information to Dask task event logging --- ipsframework/services.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index e97ce60d..979bad3f 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -182,6 +182,7 @@ def launch(executable: Any, { 'eventType' : 'IPS_LAUNCH_DASK_TASK', 'event_time': start_time, + 'state' : 'Running', 'comment' : f'task_name = {task_name}, ' f'Task key = {task_key!s}, ' f'Target = {cmd}' @@ -198,6 +199,7 @@ def launch(executable: Any, { 'eventType' : 'IPS_DASK_TASK_END', 'event_time': time.time(), + 'state' : 'Failed', 'comment' : f'task_name = {task_name} ' f'Exception when calling ' f'{executable!s}: {e!s}', @@ -214,6 +216,7 @@ def launch(executable: Any, { 'eventType' : 'IPS_DASK_TASK_END', 'event_time' : finish_time, + 'state' : 'Succeeded', 'comment' : f'task_name = ' f'{task_name},' f' elapsed time = ' @@ -229,6 +232,7 @@ def launch(executable: Any, { 'eventType' : 'IPS_DASK_TASK_END', 'event_time': time.time(), + 'state' : 'Timed out', 'comment' : f'task_name = {task_name}, ' f'timed-out after ' f'{timeout}s'}) @@ -241,17 +245,19 @@ def launch(executable: Any, { 'eventType' : 'IPS_DASK_TASK_END', 'event_time': time.time(), + 'state' : 'Failed', 'comment' : f'task_name = {task_name} ' f'Exception when calling ' f'{executable!s}: {e!s}'}) log.error(f'Task {task_name} with command {cmd} failed with {e!s}') elif isinstance(executable, Callable): # binary not a string, but is a python callable, so we call it directly - # invoke it the given *args + # with the given *args worker.log_event('ips', { 'eventType' : 'IPS_LAUNCH_DASK_TASK', 'event_time': time.time(), + 'state' : 'Running', 'comment' : f'task_name = {task_name}, ' f'Target = {executable.__name__}(' f'{",".join(map(str, args))})', @@ -264,6 +270,7 @@ def launch(executable: Any, { 'eventType' : 'IPS_DASK_TASK_END', 'event_time' : finish_time, + 'state' : 'Succeeded', 'comment' : f'task_name = {task_name}, ' f'elapsed time = ' f'{finish_time - start_time:.2f}s', From 80a40011124ec0660c2a5154a4a5ed5ec2f4ab0b Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 16:40:40 -0400 Subject: [PATCH 10/11] fix: add error handling to Dask task execution and logging --- ipsframework/services.py | 47 +++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 979bad3f..08452121 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -263,24 +263,37 @@ def launch(executable: Any, f'{",".join(map(str, args))})', }) - ret_val = executable(*args) - finish_time = time.time() + try: + ret_val = executable(*args) - worker.log_event('ips', - { - 'eventType' : 'IPS_DASK_TASK_END', - 'event_time' : finish_time, - 'state' : 'Succeeded', - 'comment' : f'task_name = {task_name}, ' - f'elapsed time = ' - f'{finish_time - start_time:.2f}s', - 'start_time' : start_time, - 'elapsed_time': finish_time - start_time, - 'target' : executable.__name__, - 'return_value': ret_val, - 'operation' : f'({",".join(map(str, - args))})', - }) + finish_time = time.time() + + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'state' : 'Succeeded', + 'comment' : f'task_name = {task_name}, ' + f'elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : executable.__name__, + 'return_value': ret_val, + 'operation' : f'({",".join(map(str, + args))})', + }) + except Exception as e: + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}'}) + log.error(f'Task {task_name} with callable {executable!s} failed ' + f'with {e!s}') else: raise RuntimeError(f'Binary argument {executable!s} is not a string or ' f'callable, cannot launch task {task_name}') From fa564d89fda2a08dc88a32714e6cbc52ea7bf090 Mon Sep 17 00:00:00 2001 From: Mark Coletti Date: Tue, 28 Apr 2026 16:44:48 -0400 Subject: [PATCH 11/11] fix: correct string formatting in logging statements for Dask tasks --- ipsframework/services.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 08452121..02367cfa 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -280,8 +280,7 @@ def launch(executable: Any, 'elapsed_time': finish_time - start_time, 'target' : executable.__name__, 'return_value': ret_val, - 'operation' : f'({",".join(map(str, - args))})', + 'operation' : f'({",".join(map(str, args))})', }) except Exception as e: worker.log_event('ips', @@ -3133,7 +3132,7 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi '--port', '0', ] - self.services.info(f'Scheduler args: {' '.join(args)}') + self.services.info(f'Scheduler args: {" ".join(args)}') self.dask_sched_popen = subprocess.Popen(args) self.dask_sched_pid = self.dask_sched_popen.pid self.services.info(f'Scheduler pid: '