Skip to content

Commit d4edece

Browse files
committed
Switch node to use new module interface
Rename some confusing things
1 parent 78714b3 commit d4edece

5 files changed

Lines changed: 40 additions & 102 deletions

File tree

receptor/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def __init__(self, args=None):
154154
)
155155
self.add_config_option(
156156
section='node',
157-
key='ping_interval',
157+
key='keepalive_interval',
158158
default_value=-1,
159159
value_type='int',
160160
hint='If specified, the node will ping all other known nodes in the mesh every N seconds. The default is -1, meaning no pings are sent.',

receptor/controller.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import datetime
33
import logging
44
import uuid
5+
from urllib.parse import urlparse
56

67
from .ws import WSServer
78
from .protocol import BasicProtocol, create_peer
@@ -11,13 +12,6 @@
1112
logger = logging.getLogger(__name__)
1213

1314

14-
def connect_to_socket(socket_path):
15-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
16-
sock.connect(socket_path)
17-
return sock
18-
19-
20-
# TODO: track stats
2115
class Controller:
2216

2317
def __init__(self, config, loop=asyncio.get_event_loop(), queue=None):
@@ -46,16 +40,17 @@ def enable_websocket_server(self, listen_address, listen_port):
4640
self.loop.create_task(listener)
4741

4842
async def add_peer(self, peer):
49-
# NOTE: Not signing or serializing
43+
if "://" not in peer:
44+
peer = f"receptor://{peer}"
45+
peer = urlparse(peer)
5046
logger.info("Connecting to peer {}".format(peer))
51-
await self.loop.create_task(create_peer(self.receptor, self.loop,
52-
*peer.strip().split(":", 1)))
47+
await self.loop.create_task(create_peer(self.receptor, self.loop, peer.hostname, peer.port))
5348

5449
async def recv(self):
5550
inner = await self.receptor.response_queue.get()
5651
return inner.raw_payload
5752

58-
async def send(self, message):
53+
async def send(self, message, expect_response=True):
5954
inner_env = envelope.Inner(
6055
receptor=self.receptor,
6156
message_id=str(uuid.uuid4()),
@@ -66,16 +61,16 @@ async def send(self, message):
6661
timestamp=datetime.datetime.utcnow().isoformat(),
6762
raw_payload=message.fd.read(),
6863
)
69-
await self.receptor.router.send(inner_env, expected_response=True)
64+
await self.receptor.router.send(inner_env, expected_response=expect_response)
7065

71-
async def ping(self, destination):
72-
await self.receptor.router.ping_node(destination)
66+
async def ping(self, destination, expected_response=True):
67+
await self.receptor.router.ping_node(destination, expected_response)
7368

74-
def run(self, coro=None):
69+
def run(self, app=None):
7570
try:
76-
if coro is None:
77-
coro = self.receptor.shutdown_handler
78-
self.loop.run_until_complete(coro())
71+
if app is None:
72+
app = self.receptor.shutdown_handler
73+
self.loop.run_until_complete(app())
7974
except KeyboardInterrupt:
8075
pass
8176
finally:

receptor/entrypoints.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,34 @@
44

55
from prometheus_client import start_http_server
66

7-
from .receptor import Receptor
87
from .controller import Controller
9-
from . import node
108

119
logger = logging.getLogger(__name__)
1210

1311

1412
def run_as_node(config):
15-
receptor = Receptor(config)
16-
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
13+
async def node_keepalive():
14+
for node_id in controller.receptor.router.get_nodes():
15+
await controller.ping(node_id, expected_response=False)
16+
absolute_call_time = (((int(controller.loop.time()) + 1) // config.node_keepalive_interval) + 1) * config.node_keepalive_interval
17+
controller.loop.call_at(absolute_call_time,
18+
controller.loop.create_task,
19+
node_keepalive())
20+
21+
controller = Controller(config)
22+
logger.info(f'Running as Receptor node with ID: {controller.receptor.node_id}')
1723
if config.node_stats_enable:
1824
logger.info(f'Starting stats on port {config.node_stats_port}')
1925
start_http_server(config.node_stats_port)
20-
node.mainloop(receptor, config.node_ping_interval)
26+
if not config.node_server_disable:
27+
controller.enable_server(config.node_listen_address,
28+
config.node_listen_port)
29+
for peer in config.node_peers:
30+
controller.loop.create_task(controller.add_peer(peer))
31+
if config.node_keepalive_interval > 1:
32+
controller.loop.create_task(node_keepalive())
33+
controller.loop.create_task(controller.receptor.watch_expire())
34+
controller.run()
2135

2236

2337
def run_as_controller(config):
@@ -27,6 +41,7 @@ def run_as_controller(config):
2741
start_http_server(config.controller_stats_port)
2842
controller.enable_server(config.controller_listen_address,
2943
config.controller_listen_port)
44+
controller.loop.create_task(controller.receptor.watch_expire())
3045
controller.run()
3146

3247

@@ -39,28 +54,28 @@ def ping_iter():
3954
while True:
4055
yield 0
4156

42-
async def handle_ping():
57+
async def ping_entrypoint():
4358
read_task = controller.loop.create_task(read_responses())
4459
await controller.add_peer(config.ping_peer)
4560
start_wait = time.time()
4661
while not controller.receptor.router.node_is_known(config.ping_recipient) and (time.time() - start_wait < 5):
4762
await asyncio.sleep(0.1)
48-
await do_ping()
63+
await send_pings()
4964
await read_task
5065

5166
async def read_responses():
5267
for _ in ping_iter():
5368
payload = await controller.recv()
5469
print("{}".format(payload))
5570

56-
async def do_ping():
71+
async def send_pings():
5772
for _ in ping_iter():
5873
await controller.ping(config.ping_recipient)
5974
await asyncio.sleep(config.ping_delay)
6075

6176
logger.info(f'Sending ping to {config.ping_recipient} via {config.ping_peer}.')
6277
controller = Controller(config)
63-
controller.run(handle_ping)
78+
controller.run(ping_entrypoint)
6479

6580

6681
def run_as_send(config):

receptor/node.py

Lines changed: 0 additions & 60 deletions
This file was deleted.

receptor/router.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,13 @@
55
import uuid
66
from collections import defaultdict
77

8-
from dateutil import parser
9-
108
from .exceptions import ReceptorBufferError, UnrouteableError
119
from .messages import envelope
1210
from .stats import route_counter
1311

1412
logger = logging.getLogger(__name__)
1513

1614

17-
async def log_ping(response):
18-
pong_received = datetime.datetime.utcnow()
19-
ping_sent, ping_received = response.raw_payload.split('|')
20-
ping_time = parser.parse(ping_received) - parser.parse(ping_sent)
21-
pong_time = pong_received - parser.parse(ping_received)
22-
logger.info(f'Ping report for {response.sender}: '
23-
f'ping={ping_time.total_seconds()}s; '
24-
f'pong={pong_time.total_seconds()}s')
25-
26-
2715
class MeshRouter:
2816
_nodes = set()
2917
_edges = set()
@@ -94,7 +82,7 @@ def get_edges(self):
9482
def get_nodes(self):
9583
return self._nodes
9684

97-
async def ping_node(self, node_id, callback=log_ping):
85+
async def ping_node(self, node_id, expected_response=True):
9886
logger.info(f'Sending ping to node {node_id}')
9987
now = datetime.datetime.utcnow().isoformat()
10088
ping_envelope = envelope.Inner(
@@ -108,7 +96,7 @@ async def ping_node(self, node_id, callback=log_ping):
10896
directive='receptor:ping',
10997
ttl=15
11098
)
111-
await self.send(ping_envelope, callback)
99+
await self.send(ping_envelope, expected_response)
112100

113101
def find_shortest_path(self, to_node_id):
114102
"""Implementation of Dijkstra algorithm"""

0 commit comments

Comments
 (0)