Skip to content

Commit d46c621

Browse files
committed
Make send_directive more flexible
1 parent 28137a0 commit d46c621

2 files changed

Lines changed: 41 additions & 19 deletions

File tree

receptor/controller.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,18 @@
99
logger = logging.getLogger(__name__)
1010

1111

12-
def send_directive(directive, recipient, payload, socket_path, expected_responses=1):
13-
if payload == '-':
14-
payload = sys.stdin.read()
12+
def connect_to_socket(socket_path):
1513
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1614
sock.connect(socket_path)
15+
return sock
16+
17+
def send_directive(directive, recipient, payload, sock):
18+
if payload == '-':
19+
payload = sys.stdin.read()
1720
sock.sendall(f"{recipient}\n{directive}\n{payload}".encode('utf-8') + protocol.DELIM)
1821
response = b''
19-
response_count = 0
20-
while response_count < expected_responses:
21-
response = sock.recv(4096)
22-
sys.stdout.buffer.write(response + b"\n")
23-
sys.stdout.flush()
24-
response_count += 1
22+
response = sock.recv(4096)
23+
return response
2524

2625
# FIXME: the socket path is in the config, it shouldn't need to be passed as an arg here
2726
def mainloop(receptor, socket_path, loop=asyncio.get_event_loop()):

receptor/entrypoints.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import datetime
2+
import json
23
import logging
4+
import sys
35
import time
46

57
from .receptor import Receptor
@@ -17,20 +19,41 @@ def run_as_controller(config):
1719

1820
def run_as_ping(config):
1921
logger.info(f'Sending ping to {config.ping_recipient}.')
20-
now = datetime.datetime.utcnow()
21-
pings_sent = 0
22-
while True:
23-
controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), config.ping_socket_path)
24-
pings_sent += 1
25-
if config.ping_count != 0 and pings_sent >= config.ping_count:
26-
break
27-
elif config.ping_delay > 0.0:
28-
time.sleep(config.ping_delay)
22+
sock = controller.connect_to_socket(config.ping_socket_path)
23+
24+
try:
25+
pings_sent = 0
26+
while True:
27+
now = datetime.datetime.utcnow()
28+
response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
29+
resp_json = json.loads(response)
30+
if 'code' in resp_json and resp_json['code'] != 0:
31+
sys.stdout.buffer.write(b"Failed to ping node.\n")
32+
else:
33+
sys.stdout.buffer.write(response + b"\n")
34+
sys.stdout.flush()
35+
pings_sent += 1
36+
if config.ping_count != 0 and pings_sent >= config.ping_count:
37+
break
38+
elif config.ping_delay > 0.0:
39+
time.sleep(config.ping_delay)
40+
except KeyboardInterrupt:
41+
pass
42+
finally:
43+
sock.close()
2944

3045

3146
def run_as_send(config):
3247
logger.info(f'Sending a {config.send_directive} directive to {config.send_recipient}.')
33-
controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, config.send_socket_path)
48+
sock = controller.connect_to_socket(config.send_socket_path)
49+
try:
50+
response = controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, sock)
51+
sys.stdout.buffer.write(response + b"\n")
52+
sys.stdout.flush()
53+
except KeyboardInterrupt:
54+
pass
55+
finally:
56+
sock.close()
3457

3558

3659
def run_as_node(config):

0 commit comments

Comments
 (0)