Skip to content

Commit bdc9dd6

Browse files
committed
Handle errors on sending messages on the controller
1 parent d46c621 commit bdc9dd6

2 files changed

Lines changed: 30 additions & 5 deletions

File tree

receptor/entrypoints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def run_as_ping(config):
2828
response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
2929
resp_json = json.loads(response)
3030
if 'code' in resp_json and resp_json['code'] != 0:
31-
sys.stdout.buffer.write(b"Failed to ping node.\n")
31+
sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['payload'].encode('utf-8'),))
3232
else:
3333
sys.stdout.buffer.write(response + b"\n")
3434
sys.stdout.flush()

receptor/protocol.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
import datetime
21
import asyncio
2+
import datetime
3+
import functools
4+
import json
35
import logging
46
import time
5-
import json
67
import uuid
8+
79
from collections import deque
810
from .messages import envelope
911
from .exceptions import ReceptorBufferError
@@ -188,5 +190,28 @@ def data_received(self, data):
188190
directive=directive,
189191
)
190192
# TODO: Persistent registry?
191-
self.loop.create_task(self.receptor.router.send(inner_env,
192-
expected_response=True))
193+
send_task = self.loop.create_task(
194+
self.receptor.router.send(
195+
inner_env,
196+
expected_response=True
197+
)
198+
)
199+
send_task.add_done_callback(
200+
functools.partial(self.data_received_callback, inner_env)
201+
)
202+
203+
def data_received_callback(self, inner_env, fut):
204+
try:
205+
fut.result()
206+
except Exception as e:
207+
self.transport.write(
208+
json.dumps(
209+
dict(
210+
timestamp=datetime.datetime.utcnow().isoformat(),
211+
in_response_to=inner_env.message_id,
212+
payload=str(e),
213+
code=1,
214+
)
215+
).encode()
216+
)
217+

0 commit comments

Comments
 (0)