Skip to content

Commit d176eee

Browse files
authored
Merge pull request #59 from jhjaggars/protocol-to-stream
defining the interface for a connection
2 parents 2f1d6e3 + d56c039 commit d176eee

12 files changed

Lines changed: 311 additions & 318 deletions

File tree

receptor/__main__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def main(args=None):
2121
'disable_existing_loggers': False,
2222
'formatters': {
2323
'verbose': {
24-
'format': '{levelname} {asctime} {module} {message}',
24+
'format': '{levelname} {asctime} {node_id} {module} {message}',
2525
'style': '{',
2626
}
2727
},
@@ -40,6 +40,13 @@ def main(args=None):
4040
}
4141
)
4242

43+
def _f(record):
44+
record.node_id = config.default_node_id
45+
return True
46+
47+
for h in logging.getLogger('receptor').handlers:
48+
h.addFilter(_f)
49+
4350
try:
4451
config.go()
4552
except Exception:

receptor/config.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,10 @@ def __init__(self, args=None):
110110
self.add_config_option(
111111
section='node',
112112
key='listen',
113-
default_value='receptor://0.0.0.0:8888',
114-
value_type='str',
113+
default_value=['receptor://0.0.0.0:8888'],
114+
value_type='list',
115115
hint='Set/override IP address and port to listen on. If not set here or in a config file, the default is receptor://0.0.0.0:8888.',
116116
)
117-
self.add_config_option(
118-
section='node',
119-
key='websocket_listen',
120-
default_value='',
121-
value_type='str',
122-
hint='Set IP address and port to listen on for websocket clients in the form ws[s]://addr:port. '
123-
'If not set here or in a config file then it is disabled',
124-
)
125117
self.add_config_option(
126118
section='node',
127119
key='peers',
@@ -182,17 +174,9 @@ def __init__(self, args=None):
182174
self.add_config_option(
183175
section='controller',
184176
key='listen',
185-
default_value='receptor://0.0.0.0:8888',
186-
value_type='str',
187-
hint='Set IP address and port to listen on. If not set here or in a config file, the default is receptor://0.0.0.0/0:8888.',
188-
)
189-
self.add_config_option(
190-
section='controller',
191-
key='websocket_listen',
192-
default_value='',
193-
value_type='str',
194-
hint='Set IP address and port to listen on for websocket clients in the form ws://addr:port. '
195-
'If not set here or in a config file then it is disabled',
177+
default_value=['receptor://0.0.0.0:8888'],
178+
value_type='list',
179+
hint='Set IP address and port to listen on. If not set here or in a config file, the default is receptor://0.0.0.0/0:8888. This option can be passed multiple times.',
196180
)
197181
self.add_config_option(
198182
section='controller',

receptor/connection/__init__.py

Whitespace-only changes.

receptor/connection/base.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import logging
2+
3+
import asyncio
4+
from collections.abc import AsyncIterator
5+
from abc import abstractmethod, abstractproperty
6+
7+
from ..messages.envelope import FramedBuffer
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class Transport(AsyncIterator):
13+
@abstractmethod
14+
async def close(self):
15+
pass
16+
17+
@abstractproperty
18+
def closed(self):
19+
pass
20+
21+
@abstractmethod
22+
async def send(self, bytes_):
23+
pass
24+
25+
26+
async def watch_queue(conn, buf):
27+
while not conn.closed:
28+
try:
29+
msg = await asyncio.wait_for(buf.get(), 5.0)
30+
except asyncio.TimeoutError:
31+
continue
32+
except Exception:
33+
logger.exception("watch_queue: error getting data from buffer")
34+
continue
35+
36+
try:
37+
await conn.send(msg)
38+
except Exception:
39+
logger.exception("watch_queue: error received trying to write")
40+
await buf.put(msg)
41+
return await conn.close()
42+
43+
44+
class Worker:
45+
def __init__(self, receptor, loop):
46+
self.receptor = receptor
47+
self.loop = loop
48+
self.conn = None
49+
self.buf = FramedBuffer(loop=self.loop)
50+
self.remote_id = None
51+
self.read_task = None
52+
self.handle_task = None
53+
self.write_task = None
54+
55+
def start_receiving(self):
56+
self.read_task = self.loop.create_task(self.receive())
57+
58+
async def receive(self):
59+
try:
60+
async for msg in self.conn:
61+
await self.buf.put(msg)
62+
except Exception:
63+
logger.exception("receive")
64+
65+
def register(self):
66+
self.receptor.update_connections(self.conn, id_=self.remote_id)
67+
68+
def unregister(self):
69+
self.receptor.remove_connection(self.conn, id_=self.remote_id, loop=self.loop)
70+
self._cancel(self.read_task)
71+
self._cancel(self.handle_task)
72+
self._cancel(self.write_task)
73+
74+
def _cancel(self, task):
75+
if task:
76+
task.cancel()
77+
78+
async def hello(self):
79+
logger.debug("sending HI")
80+
msg = self.receptor._say_hi().serialize()
81+
await self.conn.send(msg)
82+
83+
async def start_processing(self):
84+
logger.debug("sending routes")
85+
await self.receptor.send_route_advertisement()
86+
logger.debug("starting normal loop")
87+
self.handle_task = self.loop.create_task(
88+
self.receptor.message_handler(self.buf)
89+
)
90+
out = self.receptor.buffer_mgr.get_buffer_for_node(
91+
self.remote_id, self.receptor
92+
)
93+
self.write_task = self.loop.create_task(watch_queue(self.conn, out))
94+
return await self.write_task
95+
96+
async def _wait_handshake(self):
97+
logger.debug("waiting for HI")
98+
response = await self.buf.get() # TODO: deal with timeout
99+
self.remote_id = response.header["id"]
100+
self.register()
101+
102+
async def client(self, transport):
103+
try:
104+
self.conn = transport
105+
self.start_receiving()
106+
await self.hello()
107+
await self._wait_handshake()
108+
await self.start_processing()
109+
logger.debug("normal exit")
110+
finally:
111+
self.unregister()
112+
113+
async def server(self, transport):
114+
try:
115+
self.conn = transport
116+
self.start_receiving()
117+
await self._wait_handshake()
118+
await self.hello()
119+
await self.start_processing()
120+
finally:
121+
self.unregister()

receptor/connection/manager.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import functools
2+
import asyncio
3+
from urllib.parse import urlparse
4+
5+
from . import sock, ws
6+
7+
8+
def parse_peer(peer):
9+
if "://" not in peer:
10+
peer = f"receptor://{peer}"
11+
return urlparse(peer)
12+
13+
14+
class Manager:
15+
def __init__(self, factory, ssl_context, loop=None):
16+
self.factory = factory
17+
self.ssl_context = ssl_context
18+
self.loop = loop or asyncio.get_event_loop()
19+
20+
def get_listener(self, listen_url):
21+
service = parse_peer(listen_url)
22+
if service.scheme == "receptor":
23+
return asyncio.start_server(
24+
functools.partial(sock.serve, factory=self.factory),
25+
host=service.hostname,
26+
port=service.port,
27+
ssl=self.ssl_context,
28+
)
29+
elif service.scheme in ("ws", "wss"):
30+
return self.loop.create_server(
31+
ws.app(self.factory).make_handler(),
32+
service.hostname,
33+
service.port,
34+
ssl=self.ssl_context,
35+
)
36+
37+
def get_peer(self, peer):
38+
service = parse_peer(peer)
39+
if service.scheme == "receptor":
40+
return sock.connect(service.hostname, service.port, self.factory, self.loop)
41+
elif service.scheme in ("ws", "wss"):
42+
return ws.connect(peer, self.factory, self.loop)

receptor/connection/sock.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import asyncio
2+
import logging
3+
from .base import Transport
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
class RawSocket(Transport):
9+
def __init__(self, reader, writer, chunk_size=2 ** 8):
10+
self.reader = reader
11+
self.writer = writer
12+
self._closed = False
13+
self.chunk_size = chunk_size
14+
15+
async def __anext__(self):
16+
bytes_ = await self.reader.read(self.chunk_size)
17+
return bytes_
18+
19+
@property
20+
def closed(self):
21+
return self._closed
22+
23+
async def close(self):
24+
self._closed = True
25+
await self.writer.close()
26+
27+
async def send(self, bytes_):
28+
self.writer.write(bytes_)
29+
await self.writer.drain()
30+
31+
32+
async def connect(host, port, factory, loop=None):
33+
if not loop:
34+
loop = asyncio.get_event_loop()
35+
36+
worker = factory()
37+
try:
38+
r, w = await asyncio.open_connection(host, port, loop=loop)
39+
t = RawSocket(r, w)
40+
await worker.client(t)
41+
except Exception:
42+
logger.exception("sock.connect")
43+
finally:
44+
await asyncio.sleep(5)
45+
logger.debug("sock.connect: reconnection")
46+
loop.create_task(connect(host, port, factory, loop))
47+
48+
49+
async def serve(reader, writer, factory):
50+
t = RawSocket(reader, writer)
51+
await factory().server(t)

receptor/connection/ws.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import logging
2+
import functools
3+
import aiohttp
4+
import aiohttp.web
5+
import asyncio
6+
7+
from .base import Transport
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class WebSocket(Transport):
13+
def __init__(self, ws):
14+
self.ws = ws
15+
16+
async def __anext__(self):
17+
msg = await self.ws.__anext__()
18+
return msg.data
19+
20+
async def close(self):
21+
return await self.ws.close()
22+
23+
@property
24+
def closed(self):
25+
return self.ws.closed
26+
27+
async def send(self, bytes_):
28+
await self.ws.send_bytes(bytes_)
29+
30+
31+
async def connect(uri, factory, loop=None):
32+
if not loop:
33+
loop = asyncio.get_event_loop()
34+
35+
worker = factory()
36+
try:
37+
async with aiohttp.ClientSession().ws_connect(uri) as ws:
38+
t = WebSocket(ws)
39+
await worker.client(t)
40+
except Exception:
41+
logger.exception("ws.connect")
42+
finally:
43+
await asyncio.sleep(5)
44+
logger.debug("ws.connect: reconnecting")
45+
loop.create_task(connect(uri, factory=factory, loop=loop))
46+
47+
48+
async def serve(request, factory):
49+
ws = aiohttp.web.WebSocketResponse()
50+
await ws.prepare(request)
51+
52+
t = WebSocket(ws)
53+
await factory().server(t)
54+
55+
56+
def app(factory):
57+
handler = functools.partial(serve, factory=factory)
58+
app = aiohttp.web.Application()
59+
app.add_routes([aiohttp.web.get("/", handler)])
60+
return app

0 commit comments

Comments
 (0)