Skip to content

Commit f470827

Browse files
committed
Cleaning up controller emit_response
* Use sign_and_serialize for consistency and future expandability * Clean up entrypoint code
1 parent 8625092 commit f470827

2 files changed

Lines changed: 26 additions & 22 deletions

File tree

receptor/entrypoints.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ def run_as_controller(config):
1717
controller.mainloop(receptor, config.controller_socket_path)
1818

1919

20+
def run_as_node(config):
21+
receptor = Receptor(config)
22+
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
23+
node.mainloop(receptor, config.node_ping_interval)
24+
25+
2026
def run_as_ping(config):
2127
logger.info(f'Sending ping to {config.ping_recipient}.')
2228
sock = controller.connect_to_socket(config.ping_socket_path)
@@ -28,7 +34,7 @@ def run_as_ping(config):
2834
response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
2935
resp_json = json.loads(response)
3036
if 'code' in resp_json and resp_json['code'] != 0:
31-
sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['payload'].encode('utf-8'),))
37+
sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['raw_payload'].encode('utf-8'),))
3238
else:
3339
sys.stdout.buffer.write(response + b"\n")
3440
sys.stdout.flush()
@@ -54,9 +60,3 @@ def run_as_send(config):
5460
pass
5561
finally:
5662
sock.close()
57-
58-
59-
def run_as_node(config):
60-
receptor = Receptor(config)
61-
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
62-
node.mainloop(receptor, config.node_ping_interval)

receptor/protocol.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,14 @@ def connection_lost(self, exc):
170170
self.receptor.controller_connections.remove(self)
171171

172172
def emit_response(self, response):
173-
self.transport.write(json.dumps(dict(timestamp=response.timestamp,
174-
in_response_to=response.in_response_to,
175-
payload=response.raw_payload,
176-
code=response.code)).encode() + DELIM)
173+
emit_task = self.loop.create_task(response.sign_and_serialize())
174+
emit_task.add_done_callback(
175+
functools.partial(self._do_emit_callback)
176+
)
177+
178+
def _do_emit_callback(self, fut):
179+
res = fut.result()
180+
self.transport.write(res.encode() + DELIM)
177181

178182
def data_received(self, data):
179183
recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2)
@@ -198,21 +202,21 @@ def data_received(self, data):
198202
)
199203
)
200204
send_task.add_done_callback(
201-
functools.partial(self.data_received_callback, inner_env)
205+
functools.partial(self._data_received_callback, inner_env)
202206
)
203207

204-
def data_received_callback(self, inner_env, fut):
208+
def _data_received_callback(self, inner_env, fut):
205209
try:
206210
fut.result()
207211
except Exception as e:
208-
self.transport.write(
209-
json.dumps(
210-
dict(
211-
timestamp=datetime.datetime.utcnow().isoformat(),
212-
in_response_to=inner_env.message_id,
213-
payload=str(e),
214-
code=1,
215-
)
216-
).encode() + DELIM
212+
err_resp = envelope.InnerEnvelope.make_response(
213+
receptor=self.receptor,
214+
recipient=inner_env.sender,
215+
payload=str(e),
216+
in_response_to=inner_env.message_id,
217+
ttl=inner_env.ttl,
218+
serial=inner_env.serial,
219+
code=1,
217220
)
221+
self.emit_response(err_resp)
218222

0 commit comments

Comments
 (0)