Skip to content

Commit c5ffbed

Browse files
committed
Integrate websocket changes more cleanly
* Adjusted command line parameters * Adding Controller methods for managing websocket services and peers
1 parent 6a7203b commit c5ffbed

3 files changed

Lines changed: 44 additions & 40 deletions

File tree

receptor/config.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import ssl
7+
import urllib.parse
78

89
from .entrypoints import run_as_node, run_as_ping, run_as_send, run_as_controller
910
from .exceptions import ReceptorRuntimeError, ReceptorConfigError
@@ -108,17 +109,18 @@ def __init__(self, args=None):
108109
# Receptor node options
109110
self.add_config_option(
110111
section='node',
111-
key='listen_address',
112-
default_value='0.0.0.0',
112+
key='listen',
113+
default_value='receptor://0.0.0.0:8888',
113114
value_type='str',
114-
hint='Set/override IP address to listen on. If not set here or in a config file, the default is 0.0.0.0/0.',
115+
hint='Set/override IP address and port to listen on. If not set here or in a config file, the default is receptor://0.0.0.0:8888.',
115116
)
116117
self.add_config_option(
117118
section='node',
118-
key='listen_port',
119-
default_value=8888,
120-
value_type='int',
121-
hint='Set/override TCP port to listen on. If not set here or in a config file, the default is 8888.',
119+
key='websocket_listen',
120+
default_value='',
121+
value_type='str',
122+
hint='Set IP address and port to listen on for websocket clients in the form ws[s]://addr:port. '
123+
'If not set here or in a config file then it is disabled',
122124
)
123125
self.add_config_option(
124126
section='node',
@@ -179,17 +181,18 @@ def __init__(self, args=None):
179181
)
180182
self.add_config_option(
181183
section='controller',
182-
key='listen_address',
183-
default_value='0.0.0.0',
184+
key='listen',
185+
default_value='receptor://0.0.0.0:8888',
184186
value_type='str',
185-
hint='Set/override IP address to listen on. If not set here or in a config file, the default is 0.0.0.0/0.',
187+
hint='Set IP address and port to listen on. If not set here or in a config file, the default is receptor://0.0.0.0/0:8888.',
186188
)
187189
self.add_config_option(
188190
section='controller',
189-
key='listen_port',
190-
default_value=8888,
191-
value_type='int',
192-
hint='Set/override TCP port to listen on. If not set here or in a config file, the default is 8888.',
191+
key='websocket_listen',
192+
default_value='',
193+
value_type='str',
194+
hint='Set IP address and port to listen on for websocket clients in the form ws://addr:port. '
195+
'If not set here or in a config file then it is disabled',
193196
)
194197
self.add_config_option(
195198
section='controller',
@@ -199,14 +202,6 @@ def __init__(self, args=None):
199202
value_type='str',
200203
hint='Set/override controller node identifier. If unspecified here or in a config file, one will be automatically generated.',
201204
)
202-
# Ping options
203-
# self.add_config_option(
204-
# section='ping',
205-
# key='socket_path',
206-
# default_value='/var/run/receptor_controller.sock',
207-
# value_type='path',
208-
# hint='Path to control socket for controller commands.',
209-
# )
210205
self.add_config_option(
211206
section='ping',
212207
key='peer',
@@ -462,7 +457,6 @@ def go(self):
462457
raise ReceptorRuntimeError("you must specify a subcommand (%s)." % (", ".join(SUBCOMMAND_EXTRAS.keys()),))
463458
self._parsed_args.func(self)
464459

465-
466460
def get_client_ssl_context(self):
467461
if self.auth_ssl_cert:
468462
logger.debug("Loading SSL Client Context")

receptor/controller.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import uuid
55
from urllib.parse import urlparse
66

7-
from .ws import WSServer
7+
from .ws import WSServer, WSClient
88
from .protocol import BasicProtocol, create_peer
99
from .receptor import Receptor
1010
from .messages import envelope
@@ -22,29 +22,37 @@ def __init__(self, config, loop=asyncio.get_event_loop(), queue=None):
2222
self.queue = asyncio.Queue(loop=loop)
2323
self.receptor.response_queue = self.queue
2424

25-
def enable_server(self, listen_address, listen_port):
25+
def parse_peer(self, peer):
26+
if "://" not in peer:
27+
peer = f"receptor://{peer}"
28+
return urlparse(peer)
29+
30+
def enable_server(self, listen_url):
31+
service = self.parse_peer(listen_url)
2632
listener = self.loop.create_server(
2733
lambda: BasicProtocol(self.receptor, self.loop),
28-
listen_address, listen_port,
34+
service.hostname, service.port,
2935
ssl=self.receptor.config.get_server_ssl_context())
30-
logger.info("Serving on {}:{}".format(listen_address, listen_port))
31-
# TODO: Enable stats?
36+
logger.info("Serving on {}:{}".format(service.hostname, service.port))
3237
self.loop.create_task(listener)
3338

34-
def enable_websocket_server(self, listen_address, listen_port):
39+
def enable_websocket_server(self, listen_url):
40+
service = urlparse(listen_url)
3541
listener = self.loop.create_server(
3642
WSServer(self.receptor, self.loop).app().make_handler(),
37-
listen_address, listen_port,
43+
service.hostname, service.port,
3844
ssl=self.receptor.config.get_server_ssl_context())
39-
logger.info("Server ws on {}:{}".format(listen_address, listen_port))
45+
logger.info("Serving websockets on {}:{}".format(service.hostname, service.port))
4046
self.loop.create_task(listener)
4147

4248
async def add_peer(self, peer):
43-
if "://" not in peer:
44-
peer = f"receptor://{peer}"
45-
peer = urlparse(peer)
46-
logger.info("Connecting to peer {}".format(peer))
47-
await self.loop.create_task(create_peer(self.receptor, self.loop, peer.hostname, peer.port))
49+
parsed = self.parse_peer(peer)
50+
if parsed.scheme == 'receptor':
51+
logger.info("Connecting to receptor peer {}".format(peer))
52+
await self.loop.create_task(create_peer(self.receptor, self.loop, parsed.hostname, parsed.port))
53+
elif parsed.scheme in ('ws', 'wss'):
54+
logger.info("Connecting to websocket peer {}".format(peer))
55+
await self.loop.create_task(WSClient(self.receptor, self.loop).connect(peer))
4856

4957
async def recv(self):
5058
inner = await self.receptor.response_queue.get()

receptor/entrypoints.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ async def node_keepalive():
2727
logger.info(f'Starting stats on port {config.node_stats_port}')
2828
start_http_server(config.node_stats_port)
2929
if not config.node_server_disable:
30-
controller.enable_server(config.node_listen_address,
31-
config.node_listen_port)
30+
controller.enable_server(config.node_listen)
31+
if config.node_websocket_listen:
32+
controller.enable_websocket_server(config.node_websocket_listen)
3233
for peer in config.node_peers:
3334
controller.loop.create_task(controller.add_peer(peer))
3435
if config.node_keepalive_interval > 1:
@@ -42,8 +43,9 @@ def run_as_controller(config):
4243
if config.controller_stats_enable:
4344
logger.info(f'Starting stats on port {config.node_stats_port}')
4445
start_http_server(config.controller_stats_port)
45-
controller.enable_server(config.controller_listen_address,
46-
config.controller_listen_port)
46+
controller.enable_server(config.controller_listen)
47+
if config.controller_websocket_listen:
48+
controller.enable_websocket_server(config.controller_websocket_listen)
4749
controller.loop.create_task(controller.receptor.watch_expire())
4850
controller.run()
4951

0 commit comments

Comments
 (0)