Skip to content

Commit f3905cd

Browse files
authored
Merge pull request #952 from cderici/optimize-teardown
#952 #### Description This solves a couple of issues about the connection teardown. The main observation is that we have a rogue connection that we open to the controller during model connection to get some model info but never close. That connection has its own pinger/receiver tasks and an internal websocket object (and that object has its own managing tasks etc), and because this connection is totally forgotten, these tasks etc are not handled properly. For a long time I thought the task errors we were intermittently receiving were about the tasks belonging to the main connection. This fixes it by just using the `ConnectedController` context manager to properly disconnect after used, which wraps up the tasks gracefully. Another change is that this reduces the websocket close timeout to 1 second (default is 10). With the first change, we essentially won't need this, but in any case, we might wanna experiment with this timeout to make sure we don't get false closes. #### QA Steps I used @jameinel 's annotate script to reproduce the issues and investigate this, so I think it would make a good use for QAing this as well: https://github.com/jameinel/test-annotate/blob/main/annotate.py
2 parents 4349a4a + c21cb93 commit f3905cd

6 files changed

Lines changed: 30 additions & 17 deletions

File tree

juju/client/connection.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,11 +441,15 @@ async def close(self, to_reconnect=False):
441441
self.monitor.close_called.set()
442442

443443
# Cancel all the tasks (that we started):
444+
tasks_need_to_be_gathered = []
444445
if self._pinger_task:
446+
tasks_need_to_be_gathered.append(self._pinger_task)
445447
self._pinger_task.cancel()
446448
if self._receiver_task:
449+
tasks_need_to_be_gathered.append(self._receiver_task)
447450
self._receiver_task.cancel()
448451
if self._debug_log_task:
452+
tasks_need_to_be_gathered.append(self._debug_log_task)
449453
self._debug_log_task.cancel()
450454

451455
if self._ws and not self._ws.closed:
@@ -454,9 +458,6 @@ async def close(self, to_reconnect=False):
454458
if not to_reconnect:
455459
try:
456460
log.debug('Gathering all tasks for connection close')
457-
458-
# Avoid gathering the current task
459-
tasks_need_to_be_gathered = [task for task in jasyncio.all_tasks() if task != jasyncio.current_task()]
460461
await jasyncio.gather(*tasks_need_to_be_gathered)
461462
except jasyncio.CancelledError:
462463
pass
@@ -603,6 +604,7 @@ async def _pinger(self):
603604
'''
604605
async def _do_ping():
605606
try:
607+
log.debug(f'Pinger {self._pinger_task}: pinging')
606608
await pinger_facade.Ping()
607609
except jasyncio.CancelledError:
608610
raise
@@ -642,7 +644,7 @@ async def rpc(self, msg, encoder=None):
642644
if "version" not in msg:
643645
msg['version'] = self.facades[msg['type']]
644646
outgoing = json.dumps(msg, indent=2, cls=encoder)
645-
log.debug('connection {} -> {}'.format(id(self), outgoing))
647+
log.debug('connection id: {} ---> {}'.format(id(self), outgoing))
646648
for attempt in range(3):
647649
if self.monitor.status == Monitor.DISCONNECTED:
648650
# closed cleanly; shouldn't try to reconnect
@@ -665,7 +667,7 @@ async def rpc(self, msg, encoder=None):
665667
log.error('RPC: Automatic reconnect failed')
666668
raise
667669
result = await self._recv(msg['request-id'])
668-
log.debug('connection {} <- {}'.format(id(self), result))
670+
log.debug('connection id : {} <--- {}'.format(id(self), result))
669671

670672
if not result:
671673
return result
@@ -796,6 +798,7 @@ async def reconnect(self):
796798
if not self.is_debug_log_connection:
797799
self._build_facades(res.get('facades', {}))
798800
if not self._pinger_task:
801+
log.debug('reconnect: scheduling a pinger task')
799802
self._pinger_task = jasyncio.create_task(self._pinger(), name="Task_Pinger")
800803

801804
async def _connect(self, endpoints):
@@ -852,6 +855,7 @@ async def _try_endpoint(endpoint, cacert, delay):
852855
# If this is regular connection, and we dont have a
853856
# receiver_task yet, then schedule a _receiver_task
854857
elif not self.is_debug_log_connection and not self._receiver_task:
858+
log.debug('_connect: scheduling a receiver task')
855859
self._receiver_task = jasyncio.create_task(self._receiver(), name="Task_Receiver")
856860

857861
log.debug("Driver connected to juju %s", self.addr)
@@ -907,6 +911,7 @@ async def _connect_with_redirect(self, endpoints):
907911
login_result = await self._connect_with_login(e.endpoints)
908912
self._build_facades(login_result.get('facades', {}))
909913
if not self._pinger_task:
914+
log.debug('_connect_with_redirect: scheduling a pinger task')
910915
self._pinger_task = jasyncio.create_task(self._pinger(), name="Task_Pinger")
911916

912917
# _build_facades takes the facade list that comes from the connection with the controller,

juju/client/connector.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ async def connect(self, **kwargs):
7373
assert self._connection
7474
self._log_connection = await Connection.connect(**kwargs)
7575
else:
76+
# TODO (cderici): we need to investigate how to reuse/share
77+
# connections between Model & Controller objects
78+
# At least either avoid overwriting self._connection with
79+
# different types of connections (model, controller), or
80+
# have an indication of which type of entity this is
81+
# connected to.
7682
if self._connection:
7783
await self._connection.close()
7884
self._connection = await Connection.connect(**kwargs)
@@ -84,11 +90,11 @@ async def connect(self, **kwargs):
8490
if not self._connection.info['server-version'].startswith(SUPPORTED_MAJOR_VERSION):
8591
raise JujuConnectionError("juju server-version %s not supported" % juju_server_version)
8692

87-
async def disconnect(self):
93+
async def disconnect(self, entity):
8894
"""Shut down the watcher task and close websockets.
8995
"""
9096
if self._connection:
91-
log.debug('Closing model connection')
97+
log.debug(f'Connector: closing {entity} connection')
9298
await self._connection.close()
9399
self._connection = None
94100
if self._log_connection:

juju/controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ async def disconnect(self):
193193
"""Shut down the watcher task and close websockets.
194194
195195
"""
196-
await self._connector.disconnect()
196+
await self._connector.disconnect(entity='controller')
197197

198198
async def add_credential(self, name=None, credential=None, cloud=None,
199199
owner=None, force=False):

juju/model.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -771,10 +771,14 @@ async def watch_received_waiter():
771771
raise JujuError("AllWatcher task is finished abruptly without an exception.")
772772
raise self._watcher_task.exception()
773773

774-
if self.info is None:
775-
contr = await self.get_controller()
776-
self._info = await contr.get_model_info(model_name, model_uuid)
777-
log.debug('Got ModelInfo: %s', vars(self.info))
774+
if self._info is None:
775+
# TODO (cderici): See if this can be optimized away, or at least
776+
# be done lazily (i.e. not everytime after_connect, but whenever
777+
# self.info is needed -- which here can be bypassed if model_uuid
778+
# is known)
779+
async with ConnectedController(self.connection()) as contr:
780+
self._info = await contr.get_model_info(model_name, model_uuid)
781+
log.debug('Got ModelInfo: %s', vars(self.info))
778782

779783
self.uuid = self.info.uuid
780784

@@ -795,8 +799,7 @@ async def disconnect(self):
795799
self._watch_stopping.clear()
796800

797801
if self.is_connected():
798-
log.debug('Closing model connection')
799-
await self._connector.disconnect()
802+
await self._connector.disconnect(entity='model')
800803
self._info = None
801804

802805
async def add_local_charm_dir(self, charm_dir, series):

tests/integration/test_model.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ async def test_deploy_local_charm_channel(event_loop):
223223
async with base.CleanModel() as model:
224224
await model.deploy(str(charm_path), channel='stable')
225225
assert 'charm' in model.applications
226-
await model.wait_for_idle(status="active")
227-
assert model.units['charm/0'].workload_status == 'active'
228226

229227

230228
@base.bootstrapped

tests/integration/test_unit.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ async def is_leader_elected():
2626
return any([await u.is_leader_from_status() for u in app.units])
2727

2828
await utils.block_until_with_coroutine(is_leader_elected,
29-
timeout=480)
29+
timeout=480,
30+
wait_period=5)
3031

3132

3233
@base.bootstrapped

0 commit comments

Comments
 (0)