Skip to content

Commit de09b51

Browse files
authored
Merge pull request #22 from jimi-c/handle_worker_errors
Handle worker errors and send an error response
2 parents ca2bd8a + 956aa13 commit de09b51

4 files changed

Lines changed: 36 additions & 20 deletions

File tree

receptor/messages/envelope.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ def serialize(self):
3737

3838
class InnerEnvelope:
3939
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
40-
raw_payload, directive=None, in_response_to=None, ttl=None,
41-
serial=1, expire_time_delta=300):
40+
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,
41+
code=0, expire_time_delta=300):
4242
self.receptor = receptor
4343
self.message_id = message_id
4444
self.sender = sender
@@ -53,6 +53,7 @@ def __init__(self, receptor, message_id, sender, recipient, message_type, timest
5353
self.expire_time = None
5454
self.expire_time = time.time() + expire_time_delta
5555
self.serial = serial # serial index of responses
56+
self.code = code # optional code indicating an error
5657

5758
@classmethod
5859
async def deserialize(cls, receptor, msg):
@@ -62,7 +63,7 @@ async def deserialize(cls, receptor, msg):
6263
return cls(receptor=receptor, **json.loads(payload))
6364

6465
@classmethod
65-
def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None):
66+
def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None, code=0):
6667
if isinstance(payload, bytes):
6768
encoded_payload = base64.encodebytes(payload)
6869
else:
@@ -78,7 +79,8 @@ def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl
7879
directive=None,
7980
in_response_to=in_response_to,
8081
ttl=ttl,
81-
serial=serial
82+
serial=serial,
83+
code=code,
8284
)
8385

8486
def sign_and_serialize(self):

receptor/protocol.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ def connection_lost(self, exc):
169169
def emit_response(self, response):
170170
self.transport.write(json.dumps(dict(timestamp=response.timestamp,
171171
in_response_to=response.in_response_to,
172-
payload=response.raw_payload)).encode())
172+
payload=response.raw_payload,
173+
code=response.code)).encode())
173174

174175
def data_received(self, data):
175176
recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2)
@@ -184,7 +185,7 @@ def data_received(self, data):
184185
message_type='directive',
185186
timestamp=sent_timestamp.isoformat(),
186187
raw_payload=payload,
187-
directive=directive
188+
directive=directive,
188189
)
189190
# TODO: Persistent registry?
190191
self.loop.create_task(self.receptor.router.send(inner_env,

receptor/security/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ async def sign_response(self, inner_envelope):
2525
{attr: getattr(inner_envelope, attr)
2626
for attr in ['message_id', 'sender', 'recipient', 'message_type',
2727
'timestamp', 'raw_payload', 'directive',
28-
'in_response_to', 'ttl', 'serial']}
28+
'in_response_to', 'ttl', 'serial', 'code']}
2929
)
3030

receptor/work.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import pkg_resources
21
import logging
2+
import pkg_resources
33

44
from . import exceptions
55
from .messages import envelope
@@ -20,24 +20,37 @@ def load_receptor_worker(self, name):
2020
async def handle(self, inner_env):
2121
logger.info(f'Handling work for {inner_env.message_id} as {inner_env.directive}')
2222
namespace, action = inner_env.directive.split(':', 1)
23-
worker_module = self.load_receptor_worker(namespace)
24-
try:
25-
action_method = getattr(worker_module, f'{action}')
26-
except AttributeError:
27-
logger.exception(f'Could not load action {action} from {namespace}')
28-
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')
29-
responses = action_method(inner_env)
3023
serial = 0
31-
async for response in responses:
24+
try:
25+
worker_module = self.load_receptor_worker(namespace)
26+
try:
27+
action_method = getattr(worker_module, f'{action}')
28+
except AttributeError:
29+
logger.exception(f'Could not load action {action} from {namespace}')
30+
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')
31+
32+
responses = action_method(inner_env)
33+
async for response in responses:
34+
serial += 1
35+
logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}')
36+
enveloped_response = envelope.InnerEnvelope.make_response(
37+
receptor=self.receptor,
38+
recipient=inner_env.sender,
39+
payload=response,
40+
in_response_to=inner_env.message_id,
41+
serial=serial
42+
)
43+
await self.receptor.router.send(enveloped_response)
44+
except Exception as e:
3245
serial += 1
33-
logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}')
46+
logger.error(f'Error encountered while handling the response, replying with an error message ({e})')
3447
enveloped_response = envelope.InnerEnvelope.make_response(
3548
receptor=self.receptor,
3649
recipient=inner_env.sender,
37-
payload=response,
50+
payload=str(e),
3851
in_response_to=inner_env.message_id,
39-
serial=serial
52+
serial=serial,
53+
code=1,
4054
)
4155
await self.receptor.router.send(enveloped_response)
4256

43-

0 commit comments

Comments
 (0)