Skip to content

Commit 5123752

Browse files
authored
Merge pull request #35 from matburt/grouping_and_metadata
Grouping and metadata
2 parents 0da9624 + 4f8771d commit 5123752

7 files changed

Lines changed: 56 additions & 13 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ else
1717
endif
1818
endif
1919

20-
.PHONY: dist sdist
20+
.PHONY: dist sdist test
2121

2222
clean:
2323
rm -rf dist

receptor/config.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ def __init__(self, args=None):
144144
value_type='int',
145145
hint='If specified, the node will ping all other known nodes in the mesh every N seconds. The default is -1, meaning no pings are sent.',
146146
)
147+
self.add_config_option(
148+
section='node',
149+
key='groups',
150+
short_option='-g',
151+
long_option='--group',
152+
default_value=[],
153+
value_type='list',
154+
listof='str',
155+
hint='Define membership in one or more groups to aid in message routing',
156+
)
147157
# Controller options
148158
self.add_config_option(
149159
section='controller',
@@ -194,7 +204,8 @@ def __init__(self, args=None):
194204
key='delay',
195205
default_value=0,
196206
value_type='float',
197-
hint='The delay (in seconds) to wait between pings. If unspecified here or in a config file pings will be sent as soon as the previous response is received.',
207+
hint='The delay (in seconds) to wait between pings. If unspecified here or in a'
208+
'config file pings will be sent as soon as the previous response is received.',
198209
)
199210
self.add_config_option(
200211
section='ping',
@@ -358,6 +369,11 @@ def parse_options(self, args):
358369
# because env variables and configparser do not enforce the
359370
# value type, we do it now to ensure we have the type we want
360371
self._enforce_entry_type(entry)
372+
# Parse plugin_ sections to populate plugin configuration
373+
self._config_options['plugins'] = {}
374+
if self._config_file:
375+
for section in filter(lambda x: x.startswith("plugin_"), self._config_file.sections()):
376+
self._config_options['plugins'][section.replace("plugin_", "")] = dict(self._config_file[section])
361377

362378
def _enforce_entry_type(self, entry):
363379
if entry.value is not None:

receptor/connection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212

1313

1414
class Connection:
15-
def __init__(self, id_, protocol_obj, buffer_mgr, receptor):
15+
def __init__(self, id_, meta, protocol_obj, buffer_mgr, receptor):
1616
self.id_ = id_
17+
self.meta = meta
1718
self.protocol_obj = protocol_obj
1819
self.buffer_mgr = buffer_mgr
1920
self.receptor = receptor
@@ -53,6 +54,8 @@ def send_route_advertisement(self, edges=None, seen=[]):
5354
buf.push(json.dumps({
5455
"cmd": "ROUTE",
5556
"id": self.receptor.node_id,
57+
"capabilities": self.receptor.work_manager.get_capabilities(),
58+
"groups": self.receptor.config.node_groups,
5659
"edges": edges,
5760
"seen": seens
5861
}).encode("utf-8"))

receptor/messages/directive.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import logging
3+
import json
34

45
from ..exceptions import UnknownDirective
56
from . import envelope
@@ -21,7 +22,7 @@ async def __call__(self, router, inner_env):
2122
if action not in self.CONTROL_DIRECTIVES:
2223
raise UnknownDirective(f'Unknown control directive: {action}')
2324
action_method = getattr(self, action)
24-
responses = action_method(inner_env)
25+
responses = action_method(router.receptor, inner_env)
2526
serial = 0
2627
async for response in responses:
2728
serial += 1
@@ -34,9 +35,12 @@ async def __call__(self, router, inner_env):
3435
)
3536
await router.send(enveloped_response)
3637

37-
async def ping(self, inner_env):
38+
async def ping(self, receptor, inner_env):
3839
logger.info(f'Received ping from {inner_env.sender}')
39-
yield f'{inner_env.raw_payload}|{datetime.datetime.utcnow().isoformat()}'
40+
return_data = dict(initial_time=inner_env.raw_payload,
41+
response_time=str(datetime.datetime.utcnow()),
42+
active_work=receptor.work_manager.get_work())
43+
yield json.dumps(return_data)
4044

4145

4246
control = Control()

receptor/protocol.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,10 @@ async def wait_greeting(self):
9797
self.transport.close()
9898
await asyncio.sleep(.1)
9999
logger.debug("handshake complete, starting normal handle loop")
100-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated (see handle_handshake)?
101100

102101
def handle_handshake(self, data):
103102
self.greeted = True
104-
self.connection = self.receptor.add_connection(data["id"], self)
103+
self.connection = self.receptor.add_connection(data["id"], data.get("meta", {}), self)
105104
self.loop.create_task(self.watch_queue(data["id"], self.transport))
106105
self.loop.create_task(self.connection.message_handler(self.incoming_buffer))
107106

@@ -110,6 +109,9 @@ def send_handshake(self):
110109
"cmd": "HI",
111110
"id": self.receptor.node_id,
112111
"expire_time": time.time() + 10,
112+
"meta": dict(capabilities=self.receptor.work_manager.get_capabilities(),
113+
groups=self.receptor.config.node_groups,
114+
work=self.receptor.work_manager.get_work())
113115
}).encode("utf-8")
114116
self.transport.write(msg + DELIM)
115117

@@ -221,4 +223,3 @@ def _data_received_callback(self, inner_env, fut):
221223
code=1,
222224
)
223225
self.emit_response(err_resp)
224-

receptor/receptor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ def update_connections(self, connection):
9393
self.connections[connection.id_] = [connection]
9494
self.update_connection_manifest(connection.id_)
9595

96-
def add_connection(self, id_, protocol_obj):
96+
def add_connection(self, id_, meta, protocol_obj):
9797
buffer_mgr = self.config.components_buffer_manager
98-
conn = Connection(id_, protocol_obj, buffer_mgr, self)
98+
conn = Connection(id_, meta, protocol_obj, buffer_mgr, self)
9999
self.update_connections(conn)
100100
return conn
101101

receptor/work.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
class WorkManager:
1111
def __init__(self, receptor):
1212
self.receptor = receptor
13+
self.active_work = []
1314

1415
def load_receptor_worker(self, name):
1516
entry_points = [x for x in filter(lambda x: x.name == name,
@@ -18,6 +19,23 @@ def load_receptor_worker(self, name):
1819
raise exceptions.UnknownDirective(f"Error loading directive handlers for {name}")
1920
return entry_points[0].load()
2021

22+
def get_capabilities(self):
23+
return [(x.name, pkg_resources.get_distribution(x.resolve().__package__).version)
24+
for x in pkg_resources.iter_entry_points('receptor.worker')]
25+
26+
def get_work(self):
27+
return self.active_work
28+
29+
def add_work(self, env):
30+
self.active_work.append(dict(id=env.message_id,
31+
directive=env.directive,
32+
sender=env.sender))
33+
34+
def remove_work(self, env):
35+
for work in self.active_work:
36+
if env.message_id == work["id"]:
37+
self.active_work.remove(work)
38+
2139
async def handle(self, inner_env):
2240
logger.info(f'Handling work for {inner_env.message_id} as {inner_env.directive}')
2341
namespace, action = inner_env.directive.split(':', 1)
@@ -29,8 +47,8 @@ async def handle(self, inner_env):
2947
except AttributeError:
3048
logger.exception(f'Could not load action {action} from {namespace}')
3149
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')
32-
33-
responses = action_method(inner_env)
50+
self.add_work(inner_env)
51+
responses = action_method(inner_env, self.receptor.config.plugins.get("namespace", {}))
3452
async for response in responses:
3553
serial += 1
3654
logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}')
@@ -54,5 +72,6 @@ async def handle(self, inner_env):
5472
serial=serial,
5573
code=1,
5674
)
75+
self.remove_work(inner_env)
5776
await self.receptor.router.send(enveloped_response)
5877

0 commit comments

Comments
 (0)