Skip to content

Commit 71e54a2

Browse files
Propgate worker errors back to user via message bus (#99)
1 parent 963dfb7 commit 71e54a2

11 files changed

Lines changed: 266 additions & 161 deletions

File tree

src/blueapi/cli/amq.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import threading
2-
from typing import Any, Callable, List, Mapping, Optional, TypeVar, Union
2+
from typing import Any, Callable, Mapping, Optional, TypeVar
33

44
from blueapi.messaging import MessageContext, MessagingTemplate
55
from blueapi.service.model import (
@@ -9,11 +9,14 @@
99
PlanResponse,
1010
TaskResponse,
1111
)
12-
from blueapi.worker import TaskEvent
12+
from blueapi.worker import ProgressEvent, WorkerEvent
1313

1414
T = TypeVar("T")
1515

16-
_Json = Union[List[Any], Mapping[str, Any]]
16+
17+
class BlueskyRemoteError(Exception):
18+
def __init__(self, message: str) -> None:
19+
super().__init__(message)
1720

1821

1922
class AmqClient:
@@ -26,19 +29,33 @@ def run_plan(
2629
self,
2730
name: str,
2831
params: Mapping[str, Any],
29-
on_event: Optional[Callable[[TaskEvent], None]] = None,
32+
on_event: Optional[Callable[[WorkerEvent], None]] = None,
33+
on_progress_event: Optional[Callable[[ProgressEvent], None]] = None,
3034
timeout: Optional[float] = None,
3135
) -> str:
3236
complete = threading.Event()
3337

34-
def on_event_wrapper(ctx: MessageContext, event: TaskEvent) -> None:
38+
def on_event_wrapper(ctx: MessageContext, event: WorkerEvent) -> None:
3539
if on_event is not None:
3640
on_event(event)
37-
if event.is_task_terminated():
41+
42+
if event.is_complete():
3843
complete.set()
44+
if event.is_error():
45+
raise BlueskyRemoteError(str(event.errors) or "Unknown error")
46+
47+
def on_progress_event_wrapper(
48+
ctx: MessageContext, event: ProgressEvent
49+
) -> None:
50+
if on_progress_event is not None:
51+
on_progress_event(event)
3952

4053
self.app.subscribe(
41-
self.app.destinations.topic("public.worker.event.task"), on_event_wrapper
54+
self.app.destinations.topic("public.worker.event"), on_event_wrapper
55+
)
56+
self.app.subscribe(
57+
self.app.destinations.topic("public.worker.event.progress"),
58+
on_progress_event_wrapper,
4259
)
4360
# self.app.send("worker.run", {"name": name, "params": params})
4461
task_response = self.app.send_and_recieve(

src/blueapi/cli/cli.py

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
import itertools
21
import json
32
import logging
43
from pathlib import Path
5-
from typing import Dict, Mapping, Optional
4+
from typing import Optional
65

76
import click
8-
from tqdm import tqdm
97

108
from blueapi import __version__
119
from blueapi.config import StompConfig
1210
from blueapi.messaging import StompMessagingTemplate
13-
from blueapi.worker import StatusView, TaskEvent
1411

1512
from .amq import AmqClient
13+
from .updates import CliEventRenderer
1614

1715

1816
@click.group(invoke_without_command=True)
@@ -91,52 +89,11 @@ def get_devices(ctx) -> None:
9189
@click.pass_context
9290
def run_plan(ctx, name: str, parameters: str) -> None:
9391
client: AmqClient = ctx.obj["client"]
94-
95-
renderer = ProgressBarRenderer()
96-
97-
def handle_event(event: TaskEvent) -> None:
98-
renderer.update(event.statuses)
99-
if event.is_task_terminated():
100-
print("")
101-
print("")
102-
print("")
103-
print("DONE")
104-
105-
client.run_plan(name, json.loads(parameters), handle_event, timeout=120.0)
106-
107-
108-
_BAR_FMT = "{desc}: |{bar}| {percentage:3.0f}% [{elapsed}/{remaining}]"
109-
110-
111-
class ProgressBarRenderer:
112-
_bars: Dict[str, tqdm]
113-
_count: itertools.count
114-
115-
def __init__(self) -> None:
116-
self._bars = {}
117-
self._count = itertools.count()
118-
119-
def update(self, status_view: Mapping[str, StatusView]) -> None:
120-
for name, view in status_view.items():
121-
if name not in self._bars:
122-
pos = next(self._count)
123-
self._bars[name] = tqdm(
124-
position=pos,
125-
total=1.0,
126-
initial=0.0,
127-
bar_format=_BAR_FMT,
128-
)
129-
self._update(name, view)
130-
131-
def _update(self, name: str, view: StatusView) -> None:
132-
bar = self._bars[name]
133-
if (
134-
view.current is not None
135-
and view.target is not None
136-
and view.initial is not None
137-
and view.percentage is not None
138-
and view.time_elapsed is not None
139-
):
140-
bar.desc = view.display_name
141-
bar.update(view.percentage - bar.n)
142-
bar.unit = view.unit
92+
renderer = CliEventRenderer()
93+
client.run_plan(
94+
name,
95+
json.loads(parameters),
96+
renderer.on_worker_event,
97+
renderer.on_progress_event,
98+
timeout=120.0,
99+
)

src/blueapi/cli/updates.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import itertools
2+
from typing import Dict, Mapping, Optional, Union
3+
4+
from tqdm import tqdm
5+
6+
from blueapi.worker import ProgressEvent, StatusView, WorkerEvent
7+
8+
_BAR_FMT = "{desc}: |{bar}| {percentage:3.0f}% [{elapsed}/{remaining}]"
9+
10+
11+
class ProgressBarRenderer:
12+
_bars: Dict[str, tqdm]
13+
_count: itertools.count
14+
15+
def __init__(self) -> None:
16+
self._bars = {}
17+
self._count = itertools.count()
18+
19+
def update(self, status_view: Mapping[str, StatusView]) -> None:
20+
for name, view in status_view.items():
21+
if name not in self._bars:
22+
pos = next(self._count)
23+
self._bars[name] = tqdm(
24+
position=pos,
25+
total=1.0,
26+
initial=0.0,
27+
bar_format=_BAR_FMT,
28+
)
29+
self._update(name, view)
30+
31+
def _update(self, name: str, view: StatusView) -> None:
32+
bar = self._bars[name]
33+
if (
34+
view.current is not None
35+
and view.target is not None
36+
and view.initial is not None
37+
and view.percentage is not None
38+
and view.time_elapsed is not None
39+
):
40+
bar.desc = view.display_name
41+
bar.update(view.percentage - bar.n)
42+
bar.unit = view.unit
43+
44+
45+
class CliEventRenderer:
46+
_task_name: Optional[str]
47+
_pbar_renderer: ProgressBarRenderer
48+
49+
def __init__(
50+
self,
51+
task_name: Optional[str] = None,
52+
pbar_renderer: Optional[ProgressBarRenderer] = None,
53+
) -> None:
54+
self._task_name = task_name
55+
if pbar_renderer is None:
56+
pbar_renderer = ProgressBarRenderer()
57+
self._pbar_renderer = pbar_renderer
58+
59+
def on_progress_event(self, event: ProgressEvent) -> None:
60+
if self._relates_to_task(event):
61+
self._pbar_renderer.update(event.statuses)
62+
63+
def on_worker_event(self, event: WorkerEvent) -> None:
64+
if self._relates_to_task(event):
65+
print(str(event.state))
66+
67+
def _relates_to_task(self, event: Union[WorkerEvent, ProgressEvent]) -> bool:
68+
if self._task_name is None:
69+
return True
70+
elif isinstance(event, WorkerEvent):
71+
return (
72+
event.task_status is not None
73+
and event.task_status.task_name == self._task_name
74+
)
75+
elif isinstance(event, ProgressEvent):
76+
return event.task_name == self._task_name
77+
else:
78+
return False

src/blueapi/service/app.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import uuid
33
from pathlib import Path
4-
from typing import Optional
4+
from typing import Mapping, Optional
55

66
from blueapi.config import ApplicationConfig
7-
from blueapi.core import BlueskyContext, DataEvent
7+
from blueapi.core import BlueskyContext, EventStream
88
from blueapi.messaging import MessageContext, MessagingTemplate, StompMessagingTemplate
99
from blueapi.utils import ConfigLoader
10-
from blueapi.worker import RunEngineWorker, RunPlan, TaskEvent, Worker, WorkerEvent
10+
from blueapi.worker import RunEngineWorker, RunPlan, Worker
1111

1212
from .model import (
1313
DeviceModel,
@@ -35,9 +35,20 @@ def __init__(self, config: ApplicationConfig) -> None:
3535

3636
def run(self) -> None:
3737
logging.basicConfig(level=self._config.logging.level)
38-
self._worker.worker_events.subscribe(self._on_worker_event)
39-
self._worker.task_events.subscribe(self._on_task_event)
40-
self._worker.data_events.subscribe(self._on_data_event)
38+
39+
self._publish_event_streams(
40+
{
41+
self._worker.worker_events: self._template.destinations.topic(
42+
"public.worker.event"
43+
),
44+
self._worker.progress_events: self._template.destinations.topic(
45+
"public.worker.event.progress"
46+
),
47+
self._worker.data_events: self._template.destinations.topic(
48+
"public.worker.event.data"
49+
),
50+
}
51+
)
4152

4253
self._template.connect()
4354

@@ -47,20 +58,14 @@ def run(self) -> None:
4758

4859
self._worker.run_forever()
4960

50-
def _on_worker_event(self, event: WorkerEvent) -> None:
51-
self._template.send(
52-
self._template.destinations.topic("public.worker.event"), event
53-
)
54-
55-
def _on_task_event(self, event: TaskEvent) -> None:
56-
self._template.send(
57-
self._template.destinations.topic("public.worker.event.task"), event
58-
)
61+
def _publish_event_streams(
62+
self, streams_to_destinations: Mapping[EventStream, str]
63+
) -> None:
64+
for stream, destination in streams_to_destinations.items():
65+
self._publish_event_stream(stream, destination)
5966

60-
def _on_data_event(self, event: DataEvent) -> None:
61-
self._template.send(
62-
self._template.destinations.topic("public.worker.event.data"), event
63-
)
67+
def _publish_event_stream(self, stream: EventStream, destination: str) -> None:
68+
stream.subscribe(lambda event: self._template.send(destination, event))
6469

6570
def _on_run_request(self, message_context: MessageContext, task: RunPlan) -> None:
6671
name = str(uuid.uuid1())

src/blueapi/startup/example.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
from ophyd.sim import Syn2DGauss, SynGauss, SynSignal
22

33
from blueapi.plans import * # noqa: F401, F403
4-
from blueapi.service.simmotor import SynAxisWithMotionEvents
4+
5+
from .simmotor import BrokenSynAxis, SynAxisWithMotionEvents
56

67
x = SynAxisWithMotionEvents(name="x", delay=1.0, events_per_move=8)
78
y = SynAxisWithMotionEvents(name="y", delay=3.0, events_per_move=24)
89
z = SynAxisWithMotionEvents(name="z", delay=2.0, events_per_move=16)
910
theta = SynAxisWithMotionEvents(
1011
name="theta", delay=0.2, events_per_move=12, egu="degrees"
1112
)
13+
x_err = BrokenSynAxis(name="x_err", timeout=1.0)
1214
sample_pressure = SynAxisWithMotionEvents(
1315
name="sample_pressure", delay=30.0, events_per_move=128, egu="MPa", value=0.101
1416
)
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Callable, Optional
44

55
from ophyd.sim import SynAxis
6-
from ophyd.status import MoveStatus
6+
from ophyd.status import MoveStatus, Status
77

88

99
class SynAxisWithMotionEvents(SynAxis):
@@ -81,3 +81,14 @@ def sleep_and_finish():
8181
threading.Thread(target=sleep_and_finish, daemon=True).start()
8282

8383
return st
84+
85+
86+
class BrokenSynAxis(SynAxis):
87+
_timeout: float
88+
89+
def __init__(self, *, timeout: float, **kwargs) -> None:
90+
super().__init__(**kwargs)
91+
self._timeout = timeout
92+
93+
def set(self, value: float) -> Status:
94+
return Status(timeout=self._timeout)

src/blueapi/worker/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
from .event import RunnerState, StatusView, TaskEvent, WorkerEvent
1+
from .event import ProgressEvent, StatusView, TaskStatus, WorkerEvent, WorkerState
22
from .multithread import run_worker_in_own_thread
33
from .reworker import RunEngineWorker
4-
from .task import RunPlan, Task, TaskState
4+
from .task import RunPlan, Task
55
from .worker import Worker
66

77
__all__ = [
88
"run_worker_in_own_thread",
99
"RunEngineWorker",
1010
"Task",
11-
"TaskState",
1211
"Worker",
1312
"RunPlan",
1413
"WorkerEvent",
15-
"RunnerState",
16-
"TaskEvent",
14+
"WorkerState",
1715
"StatusView",
16+
"ProgressEvent",
17+
"TaskStatus",
1818
]

0 commit comments

Comments
 (0)