Skip to content

Commit d611cd6

Browse files
committed
Adding metadaa and work status
1 parent 4c922d6 commit d611cd6

4 files changed

Lines changed: 18 additions & 5 deletions

File tree

receptor/connection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212

1313

1414
class Connection:
15-
def __init__(self, id_, protocol_obj, buffer_mgr, receptor):
15+
def __init__(self, id_, meta, protocol_obj, buffer_mgr, receptor):
1616
self.id_ = id_
17+
self.meta = meta
1718
self.protocol_obj = protocol_obj
1819
self.buffer_mgr = buffer_mgr
1920
self.receptor = receptor

receptor/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ async def wait_greeting(self):
100100

101101
def handle_handshake(self, data):
102102
self.greeted = True
103-
self.connection = self.receptor.add_connection(data["id"], self)
103+
self.connection = self.receptor.add_connection(data["id"], data.get("meta", {}), self)
104104
self.loop.create_task(self.watch_queue(data["id"], self.transport))
105105
self.loop.create_task(self.connection.message_handler(self.incoming_buffer))
106106

receptor/receptor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ def update_connections(self, connection):
9393
self.connections[connection.id_] = [connection]
9494
self.update_connection_manifest(connection.id_)
9595

96-
def add_connection(self, id_, protocol_obj):
96+
def add_connection(self, id_, meta, protocol_obj):
9797
buffer_mgr = self.config.components_buffer_manager
98-
conn = Connection(id_, protocol_obj, buffer_mgr, self)
98+
conn = Connection(id_, meta, protocol_obj, buffer_mgr, self)
9999
self.update_connections(conn)
100100
return conn
101101

receptor/work.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
class WorkManager:
1111
def __init__(self, receptor):
1212
self.receptor = receptor
13+
self.active_work = []
1314

1415
def load_receptor_worker(self, name):
1516
entry_points = [x for x in filter(lambda x: x.name == name,
@@ -21,6 +22,16 @@ def load_receptor_worker(self, name):
2122
def get_capabilities(self):
2223
return [x.name for x in pkg_resources.iter_entry_points('receptor.worker')]
2324

25+
def add_work(self, env):
26+
self.active_work.append(dict(id=env.message_id,
27+
directive=env.directive,
28+
sender=env.sender))
29+
30+
def remove_work(self, env):
31+
for work in self.active_work:
32+
if env.message_id == work["id"]:
33+
self.active_work.remove(work)
34+
2435
async def handle(self, inner_env):
2536
logger.info(f'Handling work for {inner_env.message_id} as {inner_env.directive}')
2637
namespace, action = inner_env.directive.split(':', 1)
@@ -32,7 +43,7 @@ async def handle(self, inner_env):
3243
except AttributeError:
3344
logger.exception(f'Could not load action {action} from {namespace}')
3445
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')
35-
46+
self.add_work(inner_env)
3647
responses = action_method(inner_env)
3748
async for response in responses:
3849
serial += 1
@@ -57,5 +68,6 @@ async def handle(self, inner_env):
5768
serial=serial,
5869
code=1,
5970
)
71+
self.remove_work(inner_env)
6072
await self.receptor.router.send(enveloped_response)
6173

0 commit comments

Comments
 (0)