Skip to content

Commit 78714b3

Browse files
committed
Implement a better ping
Add coroutine support to the event loop
1 parent 9e27ac9 commit 78714b3

2 files changed

Lines changed: 33 additions & 59 deletions

File tree

receptor/controller.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ def enable_websocket_server(self, listen_address, listen_port):
4545
logger.info("Server ws on {}:{}".format(listen_address, listen_port))
4646
self.loop.create_task(listener)
4747

48-
def add_peer(self, peer):
48+
async def add_peer(self, peer):
4949
# NOTE: Not signing or serializing
5050
logger.info("Connecting to peer {}".format(peer))
51-
self.loop.create_task(create_peer(self.receptor, self.loop,
52-
*peer.strip().split(":", 1)))
51+
await self.loop.create_task(create_peer(self.receptor, self.loop,
52+
*peer.strip().split(":", 1)))
5353

5454
async def recv(self):
5555
inner = await self.receptor.response_queue.get()
@@ -58,7 +58,7 @@ async def recv(self):
5858
async def send(self, message):
5959
inner_env = envelope.Inner(
6060
receptor=self.receptor,
61-
messageid=str(uuid.uuid4()),
61+
message_id=str(uuid.uuid4()),
6262
sender=self.receptor.node_id,
6363
recipient=message.recipient,
6464
message_type="directive",
@@ -71,9 +71,11 @@ async def send(self, message):
7171
async def ping(self, destination):
7272
await self.receptor.router.ping_node(destination)
7373

74-
def run(self):
74+
def run(self, coro=None):
7575
try:
76-
self.loop.run_until_complete(self.receptor.shutdown_handler())
76+
if coro is None:
77+
coro = self.receptor.shutdown_handler
78+
self.loop.run_until_complete(coro())
7779
except KeyboardInterrupt:
7880
pass
7981
finally:

receptor/entrypoints.py

Lines changed: 25 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
1-
import datetime
2-
import json
31
import logging
4-
import sys
52
import time
63
import asyncio
74

85
from prometheus_client import start_http_server
96

107
from .receptor import Receptor
118
from .controller import Controller
12-
from . import exceptions
139
from . import node
1410

1511
logger = logging.getLogger(__name__)
@@ -26,70 +22,46 @@ def run_as_node(config):
2622

2723
def run_as_controller(config):
2824
controller = Controller(config)
25+
if config.controller_stats_enable:
26+
logger.info(f'Starting stats on port {config.node_stats_port}')
27+
start_http_server(config.controller_stats_port)
2928
controller.enable_server(config.controller_listen_address,
3029
config.controller_listen_port)
3130
controller.run()
3231

3332

3433
def run_as_ping(config):
34+
def ping_iter():
35+
if config.ping_count:
36+
for x in range(config.ping_count):
37+
yield x
38+
else:
39+
while True:
40+
yield 0
41+
42+
async def handle_ping():
43+
read_task = controller.loop.create_task(read_responses())
44+
await controller.add_peer(config.ping_peer)
45+
start_wait = time.time()
46+
while not controller.receptor.router.node_is_known(config.ping_recipient) and (time.time() - start_wait < 5):
47+
await asyncio.sleep(0.1)
48+
await do_ping()
49+
await read_task
50+
3551
async def read_responses():
36-
while True:
52+
for _ in ping_iter():
3753
payload = await controller.recv()
3854
print("{}".format(payload))
3955

4056
async def do_ping():
41-
await asyncio.sleep(5)
42-
await controller.ping(config.ping_recipient)
57+
for _ in ping_iter():
58+
await controller.ping(config.ping_recipient)
59+
await asyncio.sleep(config.ping_delay)
4360

4461
logger.info(f'Sending ping to {config.ping_recipient} via {config.ping_peer}.')
4562
controller = Controller(config)
46-
controller.loop.create_task(read_responses())
47-
controller.add_peer(config.ping_peer)
48-
controller.loop.create_task(do_ping())
49-
controller.run()
50-
51-
# def run_as_ping(config):
52-
# logger.info(f'Sending ping to {config.ping_recipient}.')
53-
# sock = controller.connect_to_socket(config.ping_socket_path)
63+
controller.run(handle_ping)
5464

55-
# try:
56-
# pings_sent = 0
57-
# while True:
58-
# now = datetime.datetime.utcnow()
59-
# response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
60-
# resp_json = json.loads(response)
61-
# if 'code' in resp_json and resp_json['code'] != 0:
62-
# sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['raw_payload'].encode('utf-8'),))
63-
# else:
64-
# sys.stdout.buffer.write(response + b"\n")
65-
# sys.stdout.flush()
66-
# pings_sent += 1
67-
# if config.ping_count != 0 and pings_sent >= config.ping_count:
68-
# break
69-
# elif config.ping_delay > 0.0:
70-
# time.sleep(config.ping_delay)
71-
# except KeyboardInterrupt:
72-
# pass
73-
# finally:
74-
# sock.close()
7565

7666
def run_as_send(config):
7767
pass
78-
# def run_as_send(config):
79-
# logger.info(f'Sending a {config.send_directive} directive to {config.send_recipient}.')
80-
# sock = controller.connect_to_socket(config.send_socket_path)
81-
# try:
82-
# if config.send_directive in (None, ''):
83-
# raise exceptions.UnknownDirective("The directive cannot be left out when using send.")
84-
# else:
85-
# try:
86-
# left, right = config.send_directive.split(':', 1)
87-
# except ValueError:
88-
# raise exceptions.UnknownDirective("Invalid directive format (%s). Directives must be in the form `action:method`." % (config.send_directive,))
89-
# response = controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, sock)
90-
# sys.stdout.buffer.write(response + b"\n")
91-
# sys.stdout.flush()
92-
# except KeyboardInterrupt:
93-
# pass
94-
# finally:
95-
# sock.close()

0 commit comments

Comments
 (0)