Skip to content

Commit d0a2186

Browse files
authored
Merge pull request #23 from jimi-c/ping_count
Make ping count/delay configurable
2 parents 690acf4 + f470827 commit d0a2186

5 files changed

Lines changed: 127 additions & 30 deletions

File tree

receptor/config.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,20 @@ def __init__(self, args=None):
181181
value_type='path',
182182
hint='Path to control socket for controller commands.',
183183
)
184+
self.add_config_option(
185+
section='ping',
186+
key='count',
187+
default_value=0,
188+
value_type='int',
189+
hint='Number of pings to send. If unspecified here or in a config file pings will be continuously sent until interrupted.',
190+
)
191+
self.add_config_option(
192+
section='ping',
193+
key='delay',
194+
default_value=0,
195+
value_type='float',
196+
hint='The delay (in seconds) to wait between pings. If unspecified here or in a config file pings will be sent as soon as the previous response is received.',
197+
)
184198
self.add_config_option(
185199
section='ping',
186200
key='recipient',
@@ -360,6 +374,8 @@ def _enforce_value_type(self, value, value_type):
360374
return value_type(value)
361375
elif value_type == 'int' and not isinstance(value, int):
362376
return int(value)
377+
elif value_type == 'float' and not isinstance(value, float):
378+
return float(value)
363379
elif value_type == 'str' and not isinstance(value, str):
364380
return '%s' % (value,)
365381
elif value_type == 'bool' and not isinstance(value, bool):

receptor/connection.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,24 @@ async def handle_message(self, msg):
6868
if next_hop is None:
6969
await outer_env.deserialize_inner(self.receptor)
7070
if outer_env.inner_obj.message_type == 'directive':
71-
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
72-
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
73-
await directive.control(self.receptor.router, outer_env.inner_obj)
74-
else:
75-
# other namespace/work directives
76-
await self.receptor.work_manager.handle(outer_env.inner_obj)
71+
try:
72+
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
73+
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
74+
await directive.control(self.receptor.router, outer_env.inner_obj)
75+
else:
76+
# other namespace/work directives
77+
await self.receptor.work_manager.handle(outer_env.inner_obj)
78+
except Exception as e:
79+
err_resp = outer_env.inner_obj.make_response(
80+
receptor=self.receptor,
81+
recipient=outer_env.inner_obj.sender,
82+
payload=str(e),
83+
in_response_to=outer_env.inner_obj.message_id,
84+
serial=outer_env.inner_obj.serial + 1,
85+
ttl=15,
86+
code=1,
87+
)
88+
await self.receptor.router.send(err_resp)
7789
elif outer_env.inner_obj.message_type == 'response':
7890
in_response_to = outer_env.inner_obj.in_response_to
7991
if in_response_to in self.receptor.router.response_registry:
@@ -87,3 +99,4 @@ async def handle_message(self, msg):
8799
f'Unknown message type: {outer_env.inner_obj.message_type}')
88100
else:
89101
await self.receptor.router.forward(outer_env, next_hop)
102+

receptor/controller.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,32 @@
11
import asyncio
22
import logging
3+
import os
34
import socket
45
import sys
5-
import os
66

77
from . import protocol
88

99
logger = logging.getLogger(__name__)
1010

1111

12-
def send_directive(directive, recipient, payload, socket_path):
13-
if payload == '-':
14-
payload = sys.stdin.read()
12+
def connect_to_socket(socket_path):
1513
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1614
sock.connect(socket_path)
15+
return sock
16+
17+
def send_directive(directive, recipient, payload, sock):
18+
if payload == '-':
19+
payload = sys.stdin.read()
1720
sock.sendall(f"{recipient}\n{directive}\n{payload}".encode('utf-8') + protocol.DELIM)
1821
response = b''
1922
while True:
20-
response = sock.recv(4096)
21-
sys.stdout.buffer.write(response + b"\n")
22-
sys.stdout.flush()
23+
received = sock.recv(1024)
24+
done = protocol.DELIM in received
25+
response += received.rstrip(protocol.DELIM)
26+
if done:
27+
break
28+
29+
return response
2330

2431
# FIXME: the socket path is in the config, it shouldn't need to be passed as an arg here
2532
def mainloop(receptor, socket_path, loop=asyncio.get_event_loop()):

receptor/entrypoints.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import datetime
2+
import json
23
import logging
4+
import sys
5+
import time
36

47
from .receptor import Receptor
58
from . import node
@@ -14,18 +17,46 @@ def run_as_controller(config):
1417
controller.mainloop(receptor, config.controller_socket_path)
1518

1619

20+
def run_as_node(config):
21+
receptor = Receptor(config)
22+
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
23+
node.mainloop(receptor, config.node_ping_interval)
24+
25+
1726
def run_as_ping(config):
1827
logger.info(f'Sending ping to {config.ping_recipient}.')
19-
now = datetime.datetime.utcnow()
20-
controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), config.ping_socket_path)
28+
sock = controller.connect_to_socket(config.ping_socket_path)
29+
30+
try:
31+
pings_sent = 0
32+
while True:
33+
now = datetime.datetime.utcnow()
34+
response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
35+
resp_json = json.loads(response)
36+
if 'code' in resp_json and resp_json['code'] != 0:
37+
sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['raw_payload'].encode('utf-8'),))
38+
else:
39+
sys.stdout.buffer.write(response + b"\n")
40+
sys.stdout.flush()
41+
pings_sent += 1
42+
if config.ping_count != 0 and pings_sent >= config.ping_count:
43+
break
44+
elif config.ping_delay > 0.0:
45+
time.sleep(config.ping_delay)
46+
except KeyboardInterrupt:
47+
pass
48+
finally:
49+
sock.close()
2150

2251

2352
def run_as_send(config):
2453
logger.info(f'Sending a {config.send_directive} directive to {config.send_recipient}.')
25-
controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, config.send_socket_path)
26-
27-
28-
def run_as_node(config):
29-
receptor = Receptor(config)
30-
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
31-
node.mainloop(receptor, config.node_ping_interval)
54+
sock = controller.connect_to_socket(config.send_socket_path)
55+
try:
56+
response = controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, sock)
57+
sys.stdout.buffer.write(response + b"\n")
58+
sys.stdout.flush()
59+
except KeyboardInterrupt:
60+
pass
61+
finally:
62+
sock.close()

receptor/protocol.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
import datetime
21
import asyncio
2+
import datetime
3+
import functools
4+
import json
35
import logging
46
import time
5-
import json
67
import uuid
8+
79
from collections import deque
10+
811
from .messages import envelope
912
from .exceptions import ReceptorBufferError
1013

@@ -169,10 +172,14 @@ def connection_lost(self, exc):
169172
self.receptor.controller_connections.remove(self)
170173

171174
def emit_response(self, response):
172-
self.transport.write(json.dumps(dict(timestamp=response.timestamp,
173-
in_response_to=response.in_response_to,
174-
payload=response.raw_payload,
175-
code=response.code)).encode())
175+
emit_task = self.loop.create_task(response.sign_and_serialize())
176+
emit_task.add_done_callback(
177+
functools.partial(self._do_emit_callback)
178+
)
179+
180+
def _do_emit_callback(self, fut):
181+
res = fut.result()
182+
self.transport.write(res.encode() + DELIM)
176183

177184
def data_received(self, data):
178185
recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2)
@@ -190,5 +197,28 @@ def data_received(self, data):
190197
directive=directive,
191198
)
192199
# TODO: Persistent registry?
193-
self.loop.create_task(self.receptor.router.send(inner_env,
194-
expected_response=True))
200+
send_task = self.loop.create_task(
201+
self.receptor.router.send(
202+
inner_env,
203+
expected_response=True
204+
)
205+
)
206+
send_task.add_done_callback(
207+
functools.partial(self._data_received_callback, inner_env)
208+
)
209+
210+
def _data_received_callback(self, inner_env, fut):
211+
try:
212+
fut.result()
213+
except Exception as e:
214+
err_resp = envelope.InnerEnvelope.make_response(
215+
receptor=self.receptor,
216+
recipient=inner_env.sender,
217+
payload=str(e),
218+
in_response_to=inner_env.message_id,
219+
ttl=inner_env.ttl,
220+
serial=inner_env.serial,
221+
code=1,
222+
)
223+
self.emit_response(err_resp)
224+

0 commit comments

Comments
 (0)