11import logging
22import signal
3+ from contextlib import suppress
34from dataclasses import dataclass
4- from multiprocessing import Process , Queue
5+ from multiprocessing import Event , Process , Queue , current_process
6+ from multiprocessing .synchronize import Event as EventType
57from time import sleep
68from typing import Any , Callable , List , Optional
79
@@ -54,7 +56,7 @@ def handle(
5456 self ,
5557 workers : List [Process ],
5658 args : WorkerArgs ,
57- worker_func : Callable [[WorkerArgs ], None ],
59+ worker_func : Callable [[WorkerArgs , EventType ], None ],
5860 ) -> None :
5961 """
6062 This action reloads a single process.
@@ -73,22 +75,31 @@ def handle(
7375 logger .debug (f"Process { worker .name } is already terminated." )
7476 # Waiting worker shutdown.
7577 worker .join ()
78+ event : EventType = Event ()
7679 new_process = Process (
7780 target = worker_func ,
78- kwargs = {"args" : args },
81+ kwargs = {"args" : args , "event" : event },
7982 name = f"worker-{ self .worker_num } " ,
8083 daemon = True ,
8184 )
8285 new_process .start ()
8386 logger .info (f"Process { new_process .name } restarted with pid { new_process .pid } " )
8487 workers [self .worker_num ] = new_process
88+ _wait_for_worker_startup (new_process , event )
8589
8690
8791@dataclass
8892class ShutdownAction (ProcessActionBase ):
8993 """This action shuts down process manager loop."""
9094
9195
96+ def _wait_for_worker_startup (process : Process , event : EventType ) -> None :
97+ while process .is_alive ():
98+ with suppress (TimeoutError ):
99+ event .wait (0.1 )
100+ return
101+
102+
92103def schedule_workers_reload (
93104 action_queue : "Queue[ProcessActionBase]" ,
94105) -> None :
@@ -118,6 +129,9 @@ def get_signal_handler(
118129 """
119130
120131 def _signal_handler (signum : int , _frame : Any ) -> None :
132+ if current_process ().name .startswith ("worker" ):
133+ raise KeyboardInterrupt
134+
121135 logger .debug (f"Got signal { signum } ." )
122136 action_queue .put (ShutdownAction ())
123137 logger .warn ("Workers are scheduled for shutdown." )
@@ -137,8 +151,8 @@ class ProcessManager:
137151 def __init__ (
138152 self ,
139153 args : WorkerArgs ,
140- worker_function : Callable [[WorkerArgs ], None ],
141- observer : Optional [Observer ] = None ,
154+ worker_function : Callable [[WorkerArgs , EventType ], None ],
155+ observer : Optional [Observer ] = None , # type: ignore[valid-type]
142156 ) -> None :
143157 self .worker_function = worker_function
144158 self .action_queue : "Queue[ProcessActionBase]" = Queue (- 1 )
@@ -162,10 +176,12 @@ def __init__(
162176
163177 def prepare_workers (self ) -> None :
164178 """Spawn multiple processes."""
179+ events : List [EventType ] = []
165180 for process in range (self .args .workers ):
181+ event = Event ()
166182 work_proc = Process (
167183 target = self .worker_function ,
168- kwargs = {"args" : self .args },
184+ kwargs = {"args" : self .args , "event" : event },
169185 name = f"worker-{ process } " ,
170186 daemon = True ,
171187 )
@@ -176,6 +192,11 @@ def prepare_workers(self) -> None:
176192 work_proc .pid ,
177193 )
178194 self .workers .append (work_proc )
195+ events .append (event )
196+
197+ # Wait for workers startup
198+ for worker , event in zip (self .workers , events ):
199+ _wait_for_worker_startup (worker , event )
179200
180201 def start (self ) -> None : # noqa: C901, WPS213
181202 """
0 commit comments