Skip to content

Commit fa0ae50

Browse files
committed
Expect and handle exceptions from the AllWatcher task
fixes #829 The `_all_watcher` task is a coroutine for the AllWatcher to run in the background all the time forever, and it involves a while loop that's being controlled manually through some flags (asyncio events), e.g. things like `_watch_stopping`, `watch_stopped`. The problem is that when the `_all_watcher` raises an exception (or receives one from things like `get_config()` like in the case of ether in the event loop, not handled/or re-raised. This is because this coroutine is not `await`ed (for good reason), it can't be `await`ed because there won't ever be any results, this method is supposed to be working in the background forever getting the deltas for us. As a result of this, if `_all_watcher` fails, then external flags like `_watch_received` is never set, and whoever's calling `await self._watch_received.wait()` will block forever (in this case the `_after_connect()`. Similarly the `disconnect()` waits for the `_watch_stopped` flag, which won't be set either, so if we call disconnect when all_watcher failed then it'll hang forever. This change fixes this problem by allowing (at the wait-for-flag spots) to wait for two things, 1) whichever flag we're waiting for, 2) `_all_watcher` task to be `"done()"`. In the latter case, we should expect to see an exception because that task is not supposed to be finished. More importantly, if we do see that the `_watcher_task.done()`, then we don't sit and wait forever for the _all_watcher event flags to be set, so we won't hang. Also a nice side effect of this should be that we should be getting less number of extra exception outputs saying that the "Task exception is never handled", since we do call the `.exception()` on the `_all_watcher` task. Though we'll probably continue to get those from the tasks like `_pinger` and `_debug_log` etc. However, this is a good first example solution to handle them as well.
1 parent eba8dec commit fa0ae50

1 file changed

Lines changed: 28 additions & 2 deletions

File tree

juju/model.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,26 @@ async def _after_connect(self):
759759
# we've received all the model data, which might be
760760
# a whole load of unneeded data if all the client wants
761761
# to do is make one RPC call.
762-
await self._watch_received.wait()
762+
async def watch_received_waiter():
763+
await self._watch_received.wait()
764+
waiter = jasyncio.create_task(watch_received_waiter())
765+
766+
# If we just wait for the _watch_received event and the _all_watcher task
767+
# fails (e.g. because API fails like migration is in progress), then
768+
# we'll hang because the _watch_received will never be set
769+
# Instead, we watch for two things, 1) _watch_received, 2) _all_watcher done
770+
# If _all_watcher is done before the _watch_received, then we should see
771+
# (and raise) an exception coming from the _all_watcher
772+
# Otherwise (i.e. _watch_received is set), then we're good to go
773+
done, pending = await jasyncio.wait([waiter, self._watcher_task],
774+
return_when=jasyncio.FIRST_COMPLETED)
775+
if self._watcher_task in done:
776+
# Cancel the _watch_received.wait
777+
waiter.cancel()
778+
# If there's no exception, then why did the _all_watcher broke its loop?
779+
if not self._watcher_task.exception():
780+
raise JujuError("AllWatcher task is finished abruptly without an exception.")
781+
raise self._watcher_task.exception()
763782

764783
await self.get_info()
765784
self.uuid = self.info.uuid
@@ -771,6 +790,12 @@ async def disconnect(self):
771790
if not self._watch_stopped.is_set():
772791
log.debug('Stopping watcher task')
773792
self._watch_stopping.set()
793+
# If the _all_watcher task is finished,
794+
# check to see an exception, if yes, raise,
795+
# otherwise we should see the _watch_stopped
796+
# flag is set
797+
if self._watcher_task.done() and self._watcher_task.exception():
798+
raise self._watcher_task.exception()
774799
await self._watch_stopped.wait()
775800
self._watch_stopping.clear()
776801

@@ -1040,6 +1065,7 @@ def _watch(self):
10401065
See :meth:`add_observer` to register an onchange callback.
10411066
10421067
"""
1068+
10431069
def _post_step(obj):
10441070
# Once we get the model, ensure we're running in the correct state
10451071
# as a post step.
@@ -1129,7 +1155,7 @@ async def _all_watcher():
11291155
self._watch_received.clear()
11301156
self._watch_stopping.clear()
11311157
self._watch_stopped.clear()
1132-
jasyncio.ensure_future(_all_watcher())
1158+
self._watcher_task = jasyncio.create_task(_all_watcher())
11331159

11341160
async def _notify_observers(self, delta, old_obj, new_obj):
11351161
"""Call observing callbacks, notifying them of a change in model state

0 commit comments

Comments
 (0)