Skip to content

Commit 12be4ec

Browse files
cdericijuanmanuel-tiradoJuan Tirado
authored
[JUJU-3894] Forward port some fixes from 2.9 to master (#870)
* Add kubernetes as supported series as per juju/core/series/supported.go Fixes #865 * Add example local charm to test and reproduce #865 Unfortunately we can't write an integration test that uses this because we run our tests on lxd so the charm will be deployed but will never actually get 'active'. We could test the deployment itself (that it doesn't error), but if we did that, the 3.0 will actually fail to deploy on lxd because the series is kubernetes so the test will be invalid anyways. * Fix bug in Type.from_json() parsing simple entries Should fix #850 & #851 * Add integration test for simple assumes expression This is not the ideal test because it depends on that the upstream charm having the simple assumes: -juju expression. However, simulating the bug in the facade.py for parsing such expressions is non-trivial as we need a test to call something like the CharmsFacade.CharmInfo() to trigger parsing the metadata (which is actually where it fails in the reported bug). Creating a local charm wouldn't work because we locally handle the metadata (wihtout going through anything in the facade.py where the bug is located). Maybe we can manually call AddCharm in the test for a local charm and then manually call the CharmInfo with the returned url. * Fix wait_for_units flag to not block when enough units ready wait_for_idle will keep waiting if there are less number of units available than requested (via the wait_for_units flag). However, if there are already a number of units in a desired status ready to go, more than (or equal to) wait_for_units, then it shouldn't block until other not-yet-available units to get into that desired state as well. fixes #837 * Add integration test for wait_for_units in wait_for_idle * Fix failing wait_for_idle test As per discussion in #841 (comment) Should fix #837 * Remove accidental printf for debugging * Small patch for wait_for_idle * Fix wait_for_exat_units=0 case * Fix logical typo * Fix merge resolve error for parsing assumes * Fix base channel discovery for local charms `utils.get_local_charm_base()` was incorrectly using the `--channel` argument (the charm's channel) for discovering the channel part of the base. (we should stop using the word 'channel' for two different things). This fixes that by taking out the incorrect part of the code. Should fix #839 * Fixes to pass the CI problems regarding missing postgresql charm. (#847) * Add test for deploying local charm with channel argument * Add application.get_status to get the most up to date status from API Introduces an internal self._status which is initially set to the 'unknown' which has the lowest severity. The regular property self.status uses both self._status and the unit statuses to derive the most severe status as the application status * Use application.get_status in wait_for_idle to use most up to date application status * Fix unit test TestModelWaitForIdle::test_wait_for_active_status * Fix linter * Expect and handle exceptions from the AllWatcher task fixes #829 The `_all_watcher` task is a coroutine for the AllWatcher to run in the background all the time forever, and it involves a while loop that's being controlled manually through some flags (asyncio events), e.g. things like `_watch_stopping`, `watch_stopped`. The problem is that when the `_all_watcher` raises an exception (or receives one from things like `get_config()` like in the case of ether in the event loop, not handled/or re-raised. This is because this coroutine is not `await`ed (for good reason), it can't be `await`ed because there won't ever be any results, this method is supposed to be working in the background forever getting the deltas for us. As a result of this, if `_all_watcher` fails, then external flags like `_watch_received` is never set, and whoever's calling `await self._watch_received.wait()` will block forever (in this case the `_after_connect()`. Similarly the `disconnect()` waits for the `_watch_stopped` flag, which won't be set either, so if we call disconnect when all_watcher failed then it'll hang forever. This change fixes this problem by allowing (at the wait-for-flag spots) to wait for two things, 1) whichever flag we're waiting for, 2) `_all_watcher` task to be `"done()"`. In the latter case, we should expect to see an exception because that task is not supposed to be finished. More importantly, if we do see that the `_watcher_task.done()`, then we don't sit and wait forever for the _all_watcher event flags to be set, so we won't hang. Also a nice side effect of this should be that we should be getting less number of extra exception outputs saying that the "Task exception is never handled", since we do call the `.exception()` on the `_all_watcher` task. Though we'll probably continue to get those from the tasks like `_pinger` and `_debug_log` etc. However, this is a good first example solution to handle them as well. * Assume ubuntu focal base for legacy k8s charms Updates the get_local_charm_base with Base(20.04/stable, ubuntu) for legacy k8s charms as per juju/cmd/juju/application/utils.DeduceOrigin() * Fix get_local_charm_base call. --------- Co-authored-by: Juan M. Tirado <juanmanuel-tirado@users.noreply.github.com> Co-authored-by: Juan Tirado <juan.tirado@canonical.com>
1 parent 2596a20 commit 12be4ec

10 files changed

Lines changed: 207 additions & 65 deletions

File tree

juju/application.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,15 @@ def status(self):
9191
"""
9292
status = self.safe_data['status']['current']
9393
if status == "unset":
94-
unit_status = []
94+
known_statuses = []
9595
for unit in self.units:
96-
unit_status.append(unit.workload_status)
97-
return derive_status(unit_status)
96+
known_statuses.append(unit.workload_status)
97+
# If the self.get_status() is called (i.e. the status
98+
# is received by FullStatus from the API) then add
99+
# that into this computation as it might be more up
100+
# to date (and more severe).
101+
known_statuses.append(self._status)
102+
return derive_status(known_statuses)
98103
return status
99104

100105
@property
@@ -465,6 +470,23 @@ async def get_actions(self, schema=False):
465470
actions = {k: v.description for k, v in actions.items()}
466471
return actions
467472

473+
async def get_status(self):
474+
"""Get the application status using info from the FullStatus
475+
as well, because it might be more up to date than our model
476+
477+
:return: str status
478+
"""
479+
480+
client_facade = client.ClientFacade.from_connection(self.connection)
481+
482+
full_status = await client_facade.FullStatus(patterns=None)
483+
_app = full_status.applications.get(self.name, None)
484+
if not _app:
485+
raise JujuError(f"application is not in FullStatus : {self.name}")
486+
487+
self._status = derive_status([self.status, _app.status.status])
488+
return self._status
489+
468490
def attach_resource(self, resource_name, file_name, file_obj):
469491
"""Updates the resource for an application by uploading file from
470492
local disk to the Juju controller.

juju/bundle.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class BundleHandler:
2727
Handle bundles by using the API to translate bundle YAML into a plan of
2828
steps and then dispatching each of those using the API.
2929
"""
30+
3031
def __init__(self, model, trusted=False, forced=False):
3132
self.model = model
3233
self.trusted = trusted
@@ -159,8 +160,7 @@ async def _handle_local_charms(self, bundle, bundle_dir):
159160
apps_dict[app_name]["resources"] = resources
160161
origin = client.CharmOrigin(source="local", risk="stable")
161162
if not self.model.connection().is_using_old_client:
162-
origin.base = utils.get_local_charm_base(series, '',
163-
metadata,
163+
origin.base = utils.get_local_charm_base(series,
164164
charm_dir,
165165
client.Base)
166166
self.origins[charm_url] = {str(None): origin}
@@ -523,6 +523,7 @@ class AddApplicationChange(ChangeInfo):
523523
:local_resources: identifies the path to the local resource of the
524524
application's charm.
525525
"""
526+
526527
def __init__(self, change_id, requires, params=None):
527528
super(AddApplicationChange, self).__init__(change_id, requires)
528529

@@ -669,6 +670,7 @@ class AddCharmChange(ChangeInfo):
669670
not sufficient.
670671
:channel: preferred channel for obtaining the charm.
671672
"""
673+
672674
def __init__(self, change_id, requires, params=None):
673675
super(AddCharmChange, self).__init__(change_id, requires)
674676

@@ -770,6 +772,7 @@ class AddMachineChange(ChangeInfo):
770772
"lxc" or kvm"). It is not specified for top level machines.
771773
:parent_id: id of the parent machine.
772774
"""
775+
773776
def __init__(self, change_id, requires, params=None):
774777
super(AddMachineChange, self).__init__(change_id, requires)
775778
# this one is weird, as it returns a set of parameters inside a list.
@@ -861,6 +864,7 @@ class AddRelationChange(ChangeInfo):
861864
application, and the interface is optional. Examples are
862865
"$deploy-42:web", "$deploy-42", "mysql:db".
863866
"""
867+
864868
def __init__(self, change_id, requires, params=None):
865869
super(AddRelationChange, self).__init__(change_id, requires)
866870

@@ -922,6 +926,7 @@ class AddUnitChange(ChangeInfo):
922926
:to: optional location where to add the unit, as a placeholder
923927
pointing to another unit change or to a machine change.
924928
"""
929+
925930
def __init__(self, change_id, requires, params=None):
926931
super(AddUnitChange, self).__init__(change_id, requires)
927932

@@ -990,6 +995,7 @@ class CreateOfferChange(ChangeInfo):
990995
offer.
991996
:offer_name: describes the offer name.
992997
"""
998+
993999
def __init__(self, change_id, requires, params=None):
9941000
super(CreateOfferChange, self).__init__(change_id, requires)
9951001

@@ -1044,6 +1050,7 @@ class ConsumeOfferChange(ChangeInfo):
10441050
:url: contains the location of the offer
10451051
:application_name: describes the application name on offer.
10461052
"""
1053+
10471054
def __init__(self, change_id, requires, params=None):
10481055
super(ConsumeOfferChange, self).__init__(change_id, requires)
10491056

@@ -1099,6 +1106,7 @@ class ExposeChange(ChangeInfo):
10991106
that should be able to access the port ranges that the application
11001107
has opened for each endpoint.
11011108
"""
1109+
11021110
def __init__(self, change_id, requires, params=None):
11031111
super(ExposeChange, self).__init__(change_id, requires)
11041112

@@ -1148,6 +1156,7 @@ class ScaleChange(ChangeInfo):
11481156
:application: placeholder name of the application to be scaled.
11491157
:scale: is the new scale value to use.
11501158
"""
1159+
11511160
def __init__(self, change_id, requires, params=None):
11521161
super(ScaleChange, self).__init__(change_id, requires)
11531162

@@ -1200,6 +1209,7 @@ class SetAnnotationsChange(ChangeInfo):
12001209
:entity_type: type of the entity, "application" or "machine".
12011210
:ennotations: annotations as key/value pairs.
12021211
"""
1212+
12031213
def __init__(self, change_id, requires, params=None):
12041214
super(SetAnnotationsChange, self).__init__(change_id, requires)
12051215

juju/client/facade.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ def from_json(cls, data):
685685
value = entry[i:].strip()
686686
d[key] = value
687687
else:
688-
# something like k8s-api
688+
# this is a simple entry
689689
d[entry] = ''
690690
return cls(**d)
691691
return None

juju/model.py

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ def __init__(self, entity_id, model, history_index=-1, connected=True):
262262
self._history_index = history_index
263263
self.connected = connected
264264
self.connection = model.connection()
265+
self._status = 'unknown'
265266

266267
def __repr__(self):
267268
return '<{} entity_id="{}">'.format(type(self).__name__,
@@ -740,7 +741,26 @@ async def _after_connect(self, model_name=None, model_uuid=None):
740741
# we've received all the model data, which might be
741742
# a whole load of unneeded data if all the client wants
742743
# to do is make one RPC call.
743-
await self._watch_received.wait()
744+
async def watch_received_waiter():
745+
await self._watch_received.wait()
746+
waiter = jasyncio.create_task(watch_received_waiter())
747+
748+
# If we just wait for the _watch_received event and the _all_watcher task
749+
# fails (e.g. because API fails like migration is in progress), then
750+
# we'll hang because the _watch_received will never be set
751+
# Instead, we watch for two things, 1) _watch_received, 2) _all_watcher done
752+
# If _all_watcher is done before the _watch_received, then we should see
753+
# (and raise) an exception coming from the _all_watcher
754+
# Otherwise (i.e. _watch_received is set), then we're good to go
755+
done, pending = await jasyncio.wait([waiter, self._watcher_task],
756+
return_when=jasyncio.FIRST_COMPLETED)
757+
if self._watcher_task in done:
758+
# Cancel the _watch_received.wait
759+
waiter.cancel()
760+
# If there's no exception, then why did the _all_watcher broke its loop?
761+
if not self._watcher_task.exception():
762+
raise JujuError("AllWatcher task is finished abruptly without an exception.")
763+
raise self._watcher_task.exception()
744764

745765
if self.info is None:
746766
contr = await self.get_controller()
@@ -756,6 +776,12 @@ async def disconnect(self):
756776
if not self._watch_stopped.is_set():
757777
log.debug('Stopping watcher task')
758778
self._watch_stopping.set()
779+
# If the _all_watcher task is finished,
780+
# check to see an exception, if yes, raise,
781+
# otherwise we should see the _watch_stopped
782+
# flag is set
783+
if self._watcher_task.done() and self._watcher_task.exception():
784+
raise self._watcher_task.exception()
759785
await self._watch_stopped.wait()
760786
self._watch_stopping.clear()
761787

@@ -1133,6 +1159,7 @@ def _watch(self):
11331159
See :meth:`add_observer` to register an onchange callback.
11341160
11351161
"""
1162+
11361163
def _post_step(obj):
11371164
# Once we get the model, ensure we're running in the correct state
11381165
# as a post step.
@@ -1222,7 +1249,7 @@ async def _all_watcher():
12221249
self._watch_received.clear()
12231250
self._watch_stopping.clear()
12241251
self._watch_stopped.clear()
1225-
jasyncio.ensure_future(_all_watcher())
1252+
self._watcher_task = jasyncio.create_task(_all_watcher())
12261253

12271254
async def _notify_observers(self, delta, old_obj, new_obj):
12281255
"""Call observing callbacks, notifying them of a change in model state
@@ -1744,11 +1771,10 @@ async def deploy(
17441771
# We have a local charm dir that needs to be uploaded
17451772
charm_dir = os.path.abspath(os.path.expanduser(identifier))
17461773
metadata = utils.get_local_charm_metadata(charm_dir)
1747-
charm_series = charm_series or await get_charm_series(metadata,
1748-
self)
1749-
if not base:
1750-
charm_origin.base = utils.get_local_charm_base(
1751-
charm_series, channel, metadata, charm_dir, client.Base)
1774+
charm_series = charm_series or await get_charm_series(metadata, self)
1775+
1776+
base = utils.get_local_charm_base(charm_series, charm_dir, client.Base)
1777+
charm_origin.base = base
17521778

17531779
if not application_name:
17541780
application_name = metadata['name']
@@ -2497,7 +2523,7 @@ async def _get_source_api(self, url, controller_name=None):
24972523

24982524
async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=False,
24992525
wait_for_active=False, timeout=10 * 60, idle_period=15, check_freq=0.5,
2500-
status=None, wait_for_units=1, wait_for_exact_units=None):
2526+
status=None, wait_for_units=None, wait_for_exact_units=None):
25012527
"""Wait for applications in the model to settle into an idle state.
25022528
25032529
:param apps (list[str]): Optional list of specific app names to wait on.
@@ -2545,6 +2571,8 @@ async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=F
25452571
warnings.warn("wait_for_active is deprecated; use status", DeprecationWarning)
25462572
status = "active"
25472573

2574+
_wait_for_units = wait_for_units if wait_for_units is not None else 1
2575+
25482576
timeout = timedelta(seconds=timeout) if timeout is not None else None
25492577
idle_period = timedelta(seconds=idle_period)
25502578
start_time = datetime.now()
@@ -2556,6 +2584,7 @@ async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=F
25562584

25572585
apps = apps or self.applications
25582586
idle_times = {}
2587+
units_ready = set() # The units that are in the desired state
25592588
last_log_time = None
25602589
log_interval = timedelta(seconds=30)
25612590

@@ -2581,6 +2610,9 @@ def _raise_for_status(entities, status):
25812610
'Invalid value for wait_for_exact_units : %s' % wait_for_exact_units
25822611

25832612
while True:
2613+
# The list 'busy' is what keeps this loop going,
2614+
# i.e. it'll stop when busy is empty after all the
2615+
# units are scanned
25842616
busy = []
25852617
errors = {}
25862618
blocks = {}
@@ -2589,19 +2621,28 @@ def _raise_for_status(entities, status):
25892621
busy.append(app_name + " (missing)")
25902622
continue
25912623
app = self.applications[app_name]
2592-
if raise_on_error and app.status == "error":
2624+
app_status = await app.get_status()
2625+
if raise_on_error and app_status == "error":
25932626
errors.setdefault("App", []).append(app.name)
2594-
if raise_on_blocked and app.status == "blocked":
2627+
if raise_on_blocked and app_status == "blocked":
25952628
blocks.setdefault("App", []).append(app.name)
2629+
2630+
# Check if wait_for_exact_units flag is used
25962631
if wait_for_exact_units is not None:
25972632
if len(app.units) != wait_for_exact_units:
25982633
busy.append(app.name + " (waiting for exactly %s units, current : %s)" %
25992634
(wait_for_exact_units, len(app.units)))
26002635
continue
2601-
elif len(app.units) < wait_for_units:
2636+
# If we have less # of units then required, then wait a bit more
2637+
elif len(app.units) < _wait_for_units:
26022638
busy.append(app.name + " (not enough units yet - %s/%s)" %
2603-
(len(app.units), wait_for_units))
2639+
(len(app.units), _wait_for_units))
26042640
continue
2641+
# User wants to see a certain # of units, and we have enough
2642+
elif wait_for_units and len(units_ready) >= _wait_for_units:
2643+
# So no need to keep looking, we have the desired number of units ready to go,
2644+
# exit the loop. Don't return, though, we might still have some errors to raise
2645+
break
26052646
for unit in app.units:
26062647
if unit.machine is not None and unit.machine.status == "error":
26072648
errors.setdefault("Machine", []).append(unit.machine.id)
@@ -2615,10 +2656,17 @@ def _raise_for_status(entities, status):
26152656
if raise_on_blocked and unit.workload_status == "blocked":
26162657
blocks.setdefault("Unit", []).append(unit.name)
26172658
continue
2618-
waiting_for_status = status and unit.workload_status != status
2619-
if not waiting_for_status and unit.agent_status == "idle":
2659+
waiting_for_a_particular_status = status and unit.workload_status != status
2660+
if not waiting_for_a_particular_status and unit.agent_status == "idle":
2661+
# We'll be here in two cases:
2662+
# 1) We're not waiting for a particular status and the agent is "idle"
2663+
# 2) We're waiting for a particular status and the workload is in that status
2664+
# Either way, the unit is ready, start measuring the time period that
2665+
# it needs to stay in that state (i.e. idle_period)
2666+
units_ready.add(unit.name)
26202667
now = datetime.now()
26212668
idle_start = idle_times.setdefault(unit.name, now)
2669+
26222670
if now - idle_start < idle_period:
26232671
busy.append("{} [{}] {}: {}".format(unit.name,
26242672
unit.agent_status,

0 commit comments

Comments
 (0)