File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -262,6 +262,21 @@ void DataProcessingDevice::Init()
262262 mState .inputChannelInfos [ci].state = InputChannelState::Pull;
263263 }
264264 }
265+ // / This should post a message on the queue...
266+ SubscribeToNewTransition (" dpl" , [loop = mState .loop ](fair::mq::Transition t) {
267+ if (loop) {
268+ uv_async_t handle;
269+ int res = uv_async_init (loop, &handle, nullptr );
270+ if (res < 0 ) {
271+ LOG (ERROR) << " Unable to initialise subscription" ;
272+ }
273+ res = uv_async_send (&handle);
274+ if (res < 0 ) {
275+ LOG (ERROR) << " Unable to notify subscription" ;
276+ }
277+ LOG (debug) << " State transition requested" ;
278+ }
279+ });
265280}
266281
267282void on_signal_callback (uv_signal_t * handle, int signum)
@@ -438,6 +453,11 @@ bool DataProcessingDevice::ConditionalRun()
438453 shouldNotWait = true ;
439454 }
440455 uv_run (mState .loop , shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
456+
457+ // A new state was requested, we exit.
458+ if (NewStatePending ()) {
459+ return false ;
460+ }
441461 }
442462
443463 // Notify on the main thread the new region callbacks, making sure
You can’t perform that action at this time.
0 commit comments