Skip to content

Commit 9e27ac9

Browse files
committed
Staging module changes
1 parent 0ae97cf commit 9e27ac9

10 files changed

Lines changed: 243 additions & 232 deletions

File tree

receptor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .controller import Controller # noqa
2+
from .config import ReceptorConfig # noqa

receptor/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99

1010
def main(args=None):
11-
11+
1212
try:
1313
config = ReceptorConfig(args)
1414
except Exception as e:

receptor/config.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
import ssl
77

8-
from .entrypoints import run_as_node, run_as_controller, run_as_ping, run_as_send
8+
from .entrypoints import run_as_node, run_as_ping, run_as_send, run_as_controller
99
from .exceptions import ReceptorRuntimeError, ReceptorConfigError
1010

1111
logger = logging.getLogger(__name__)
@@ -18,7 +18,7 @@
1818
},
1919
'controller': {
2020
'hint': 'Run a Receptor controller',
21-
'entrypoint': run_as_controller,
21+
'entrypoint': run_as_controller, # TODO: New entrypoint
2222
},
2323
'ping': {
2424
'hint': 'Tell the local controller to ping a node',
@@ -200,12 +200,19 @@ def __init__(self, args=None):
200200
hint='Set/override controller node identifier. If unspecified here or in a config file, one will be automatically generated.',
201201
)
202202
# Ping options
203+
# self.add_config_option(
204+
# section='ping',
205+
# key='socket_path',
206+
# default_value='/var/run/receptor_controller.sock',
207+
# value_type='path',
208+
# hint='Path to control socket for controller commands.',
209+
# )
203210
self.add_config_option(
204211
section='ping',
205-
key='socket_path',
206-
default_value='/var/run/receptor_controller.sock',
207-
value_type='path',
208-
hint='Path to control socket for controller commands.',
212+
key='peer',
213+
default_value='',
214+
value_type='str',
215+
hint='The peer to relay the ping directive through'
209216
)
210217
self.add_config_option(
211218
section='controller',

receptor/controller.py

Lines changed: 66 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import asyncio
2+
import datetime
23
import logging
3-
import os
4-
import socket
5-
import sys
4+
import uuid
65

7-
from . import protocol
86
from .ws import WSServer
7+
from .protocol import BasicProtocol, create_peer
8+
from .receptor import Receptor
9+
from .messages import envelope
910

1011
logger = logging.getLogger(__name__)
1112

@@ -16,47 +17,64 @@ def connect_to_socket(socket_path):
1617
return sock
1718

1819

19-
def send_directive(directive, recipient, payload, sock):
20-
if payload == '-':
21-
payload = sys.stdin.read()
22-
sock.sendall(f"{recipient}\n{directive}\n{payload}".encode('utf-8') + protocol.DELIM)
23-
response = b''
24-
while True:
25-
received = sock.recv(1024)
26-
done = protocol.DELIM in received
27-
response += received.rstrip(protocol.DELIM)
28-
if done:
29-
break
30-
31-
return response
32-
33-
34-
# FIXME: the socket path is in the config, it shouldn't need to be passed as an arg here
35-
def mainloop(receptor, socket_path, loop=asyncio.get_event_loop()):
36-
config = receptor.config
37-
listener = loop.create_server(
38-
lambda: protocol.BasicProtocol(receptor, loop),
39-
config.controller_listen_address, config.controller_listen_port, ssl=config.get_server_ssl_context())
40-
logger.info("Serving on %s:%s", config.controller_listen_address, config.controller_listen_port)
41-
loop.create_task(listener)
42-
43-
ws_server = WSServer(receptor, loop)
44-
ws_listener = loop.create_server(ws_server.app().make_handler(),
45-
config.node_listen_address, config.node_listen_port + 1, ssl=config.get_server_ssl_context())
46-
loop.create_task(ws_listener)
47-
logger.info("Serving ws on %s:%s", config.node_listen_address, config.node_listen_port + 1)
48-
49-
control_listener = loop.create_unix_server(
50-
lambda: protocol.BasicControllerProtocol(receptor, loop),
51-
path=socket_path
52-
)
53-
logger.info(f'Opening control socket on {socket_path}')
54-
loop.create_task(control_listener)
55-
loop.create_task(receptor.watch_expire())
56-
try:
57-
loop.run_forever()
58-
except KeyboardInterrupt:
59-
pass
60-
finally:
61-
loop.stop()
62-
os.remove(socket_path)
20+
# TODO: track stats
21+
class Controller:
22+
23+
def __init__(self, config, loop=asyncio.get_event_loop(), queue=None):
24+
self.receptor = Receptor(config)
25+
self.loop = loop
26+
self.queue = queue
27+
if self.queue is None:
28+
self.queue = asyncio.Queue(loop=loop)
29+
self.receptor.response_queue = self.queue
30+
31+
def enable_server(self, listen_address, listen_port):
32+
listener = self.loop.create_server(
33+
lambda: BasicProtocol(self.receptor, self.loop),
34+
listen_address, listen_port,
35+
ssl=self.receptor.config.get_server_ssl_context())
36+
logger.info("Serving on {}:{}".format(listen_address, listen_port))
37+
# TODO: Enable stats?
38+
self.loop.create_task(listener)
39+
40+
def enable_websocket_server(self, listen_address, listen_port):
41+
listener = self.loop.create_server(
42+
WSServer(self.receptor, self.loop).app().make_handler(),
43+
listen_address, listen_port,
44+
ssl=self.receptor.config.get_server_ssl_context())
45+
logger.info("Server ws on {}:{}".format(listen_address, listen_port))
46+
self.loop.create_task(listener)
47+
48+
def add_peer(self, peer):
49+
# NOTE: Not signing or serializing
50+
logger.info("Connecting to peer {}".format(peer))
51+
self.loop.create_task(create_peer(self.receptor, self.loop,
52+
*peer.strip().split(":", 1)))
53+
54+
async def recv(self):
55+
inner = await self.receptor.response_queue.get()
56+
return inner.raw_payload
57+
58+
async def send(self, message):
59+
inner_env = envelope.Inner(
60+
receptor=self.receptor,
61+
messageid=str(uuid.uuid4()),
62+
sender=self.receptor.node_id,
63+
recipient=message.recipient,
64+
message_type="directive",
65+
directive=message.directive,
66+
timestamp=datetime.datetime.utcnow().isoformat(),
67+
raw_payload=message.fd.read(),
68+
)
69+
await self.receptor.router.send(inner_env, expected_response=True)
70+
71+
async def ping(self, destination):
72+
await self.receptor.router.ping_node(destination)
73+
74+
def run(self):
75+
try:
76+
self.loop.run_until_complete(self.receptor.shutdown_handler())
77+
except KeyboardInterrupt:
78+
pass
79+
finally:
80+
self.loop.stop()

receptor/entrypoints.py

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,18 @@
33
import logging
44
import sys
55
import time
6+
import asyncio
67

78
from prometheus_client import start_http_server
89

910
from .receptor import Receptor
10-
from . import controller
11+
from .controller import Controller
1112
from . import exceptions
1213
from . import node
1314

1415
logger = logging.getLogger(__name__)
1516

1617

17-
def run_as_controller(config):
18-
receptor = Receptor(config)
19-
logger.info(f'Starting up as node ID {receptor.node_id}')
20-
if config.controller_stats_enable:
21-
logger.info(f'Starting stats on port {config.controller_stats_port}')
22-
start_http_server(config.controller_stats_port)
23-
controller.mainloop(receptor, config.controller_socket_path)
24-
25-
2618
def run_as_node(config):
2719
receptor = Receptor(config)
2820
logger.info(f'Running as Receptor node with ID: {receptor.node_id}')
@@ -32,47 +24,72 @@ def run_as_node(config):
3224
node.mainloop(receptor, config.node_ping_interval)
3325

3426

35-
def run_as_ping(config):
36-
logger.info(f'Sending ping to {config.ping_recipient}.')
37-
sock = controller.connect_to_socket(config.ping_socket_path)
27+
def run_as_controller(config):
28+
controller = Controller(config)
29+
controller.enable_server(config.controller_listen_address,
30+
config.controller_listen_port)
31+
controller.run()
32+
3833

39-
try:
40-
pings_sent = 0
34+
def run_as_ping(config):
35+
async def read_responses():
4136
while True:
42-
now = datetime.datetime.utcnow()
43-
response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
44-
resp_json = json.loads(response)
45-
if 'code' in resp_json and resp_json['code'] != 0:
46-
sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['raw_payload'].encode('utf-8'),))
47-
else:
48-
sys.stdout.buffer.write(response + b"\n")
49-
sys.stdout.flush()
50-
pings_sent += 1
51-
if config.ping_count != 0 and pings_sent >= config.ping_count:
52-
break
53-
elif config.ping_delay > 0.0:
54-
time.sleep(config.ping_delay)
55-
except KeyboardInterrupt:
56-
pass
57-
finally:
58-
sock.close()
37+
payload = await controller.recv()
38+
print("{}".format(payload))
39+
40+
async def do_ping():
41+
await asyncio.sleep(5)
42+
await controller.ping(config.ping_recipient)
43+
44+
logger.info(f'Sending ping to {config.ping_recipient} via {config.ping_peer}.')
45+
controller = Controller(config)
46+
controller.loop.create_task(read_responses())
47+
controller.add_peer(config.ping_peer)
48+
controller.loop.create_task(do_ping())
49+
controller.run()
50+
51+
# def run_as_ping(config):
52+
# logger.info(f'Sending ping to {config.ping_recipient}.')
53+
# sock = controller.connect_to_socket(config.ping_socket_path)
5954

55+
# try:
56+
# pings_sent = 0
57+
# while True:
58+
# now = datetime.datetime.utcnow()
59+
# response = controller.send_directive('receptor:ping', config.ping_recipient, now.isoformat(), sock)
60+
# resp_json = json.loads(response)
61+
# if 'code' in resp_json and resp_json['code'] != 0:
62+
# sys.stdout.buffer.write(b"Failed to ping node: %b\n" % (resp_json['raw_payload'].encode('utf-8'),))
63+
# else:
64+
# sys.stdout.buffer.write(response + b"\n")
65+
# sys.stdout.flush()
66+
# pings_sent += 1
67+
# if config.ping_count != 0 and pings_sent >= config.ping_count:
68+
# break
69+
# elif config.ping_delay > 0.0:
70+
# time.sleep(config.ping_delay)
71+
# except KeyboardInterrupt:
72+
# pass
73+
# finally:
74+
# sock.close()
6075

6176
def run_as_send(config):
62-
logger.info(f'Sending a {config.send_directive} directive to {config.send_recipient}.')
63-
sock = controller.connect_to_socket(config.send_socket_path)
64-
try:
65-
if config.send_directive in (None, ''):
66-
raise exceptions.UnknownDirective("The directive cannot be left out when using send.")
67-
else:
68-
try:
69-
left, right = config.send_directive.split(':', 1)
70-
except ValueError:
71-
raise exceptions.UnknownDirective("Invalid directive format (%s). Directives must be in the form `action:method`." % (config.send_directive,))
72-
response = controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, sock)
73-
sys.stdout.buffer.write(response + b"\n")
74-
sys.stdout.flush()
75-
except KeyboardInterrupt:
76-
pass
77-
finally:
78-
sock.close()
77+
pass
78+
# def run_as_send(config):
79+
# logger.info(f'Sending a {config.send_directive} directive to {config.send_recipient}.')
80+
# sock = controller.connect_to_socket(config.send_socket_path)
81+
# try:
82+
# if config.send_directive in (None, ''):
83+
# raise exceptions.UnknownDirective("The directive cannot be left out when using send.")
84+
# else:
85+
# try:
86+
# left, right = config.send_directive.split(':', 1)
87+
# except ValueError:
88+
# raise exceptions.UnknownDirective("Invalid directive format (%s). Directives must be in the form `action:method`." % (config.send_directive,))
89+
# response = controller.send_directive(config.send_directive, config.send_recipient, config.send_payload, sock)
90+
# sys.stdout.buffer.write(response + b"\n")
91+
# sys.stdout.flush()
92+
# except KeyboardInterrupt:
93+
# pass
94+
# finally:
95+
# sock.close()

0 commit comments

Comments
 (0)