Skip to content

Commit 47c8a0c

Browse files
committed
Show groups and work at various locations
This refactors ping responses to show current work and broadcasts capabilities and groups when performing router advertisements
1 parent d611cd6 commit 47c8a0c

4 files changed

Lines changed: 14 additions & 4 deletions

File tree

receptor/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ def send_route_advertisement(self, edges=None, seen=[]):
5454
buf.push(json.dumps({
5555
"cmd": "ROUTE",
5656
"id": self.receptor.node_id,
57+
"capabilities": self.receptor.work_manager.get_capabilities(),
58+
"groups": self.receptor.config.node_groups,
5759
"edges": edges,
5860
"seen": seens
5961
}).encode("utf-8"))

receptor/messages/directive.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import logging
3+
import json
34

45
from ..exceptions import UnknownDirective
56
from . import envelope
@@ -21,7 +22,7 @@ async def __call__(self, router, inner_env):
2122
if action not in self.CONTROL_DIRECTIVES:
2223
raise UnknownDirective(f'Unknown control directive: {action}')
2324
action_method = getattr(self, action)
24-
responses = action_method(inner_env)
25+
responses = action_method(router.receptor, inner_env)
2526
serial = 0
2627
async for response in responses:
2728
serial += 1
@@ -34,9 +35,12 @@ async def __call__(self, router, inner_env):
3435
)
3536
await router.send(enveloped_response)
3637

37-
async def ping(self, inner_env):
38+
async def ping(self, receptor, inner_env):
3839
logger.info(f'Received ping from {inner_env.sender}')
39-
yield f'{inner_env.raw_payload}|{datetime.datetime.utcnow().isoformat()}'
40+
return_data = dict(initial_time=inner_env.raw_payload,
41+
response_time=str(datetime.datetime.utcnow()),
42+
active_work=receptor.work_manager.get_work())
43+
yield json.dumps(return_data)
4044

4145

4246
control = Control()

receptor/protocol.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ def send_handshake(self):
110110
"id": self.receptor.node_id,
111111
"expire_time": time.time() + 10,
112112
"meta": dict(capabilities=self.receptor.work_manager.get_capabilities(),
113-
groups=self.receptor.config.node_groups),
113+
groups=self.receptor.config.node_groups,
114+
work=self.receptor.work_manager.get_work())
114115
}).encode("utf-8")
115116
self.transport.write(msg + DELIM)
116117

receptor/work.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ def load_receptor_worker(self, name):
2222
def get_capabilities(self):
2323
return [x.name for x in pkg_resources.iter_entry_points('receptor.worker')]
2424

25+
def get_work(self):
26+
return self.active_work
27+
2528
def add_work(self, env):
2629
self.active_work.append(dict(id=env.message_id,
2730
directive=env.directive,

0 commit comments

Comments
 (0)