Skip to content

Commit 7763416

Browse files
committed
wiring up streams for regular sockets
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent fa0c54d commit 7763416

3 files changed

Lines changed: 35 additions & 6 deletions

File tree

receptor/connection/sock.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
import asyncio
2+
import logging
13
from . import Transport
24

5+
logger = logging.getLogger(__name__)
6+
37

48
class RawSocket(Transport):
59
def __init__(self, reader, writer):
@@ -21,3 +25,25 @@ async def close(self):
2125
async def send(self, bytes_):
2226
self.writer.write(bytes_)
2327
await self.writer.drain()
28+
29+
30+
async def connect(host, port, factory, loop=None):
31+
if not loop:
32+
loop = asyncio.get_event_loop()
33+
34+
worker = factory()
35+
try:
36+
r, w = await asyncio.open_connection(host, port, loop=loop)
37+
t = RawSocket(r, w)
38+
await worker.client(t)
39+
except Exception:
40+
logger.exception("sock.connect")
41+
finally:
42+
await asyncio.sleep(5)
43+
logger.debug("sock.connect: reconnection")
44+
loop.create_task(connect(host, port, factory, loop))
45+
46+
47+
async def serve(reader, writer, factory):
48+
t = RawSocket(reader, writer)
49+
await factory().server(t)

receptor/connection/ws.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ async def connect(uri, factory, loop=None):
3838
t = WebSocket(ws)
3939
await worker.client(t)
4040
except Exception:
41-
logger.exception("connect")
41+
logger.exception("ws.connect")
4242
finally:
4343
await asyncio.sleep(5)
44-
logger.debug("reconnecting")
44+
logger.debug("ws.connect: reconnecting")
4545
loop.create_task(connect(uri, factory=factory, loop=loop))
4646

4747

receptor/controller.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import datetime
3+
import functools
34
import logging
45
import uuid
56
from urllib.parse import urlparse
@@ -8,7 +9,7 @@
89
from .receptor import Receptor
910
from .messages import envelope
1011
from . import connection
11-
from .connection import ws, Worker
12+
from .connection import ws, sock, Worker
1213
from . import protocol
1314

1415
logger = logging.getLogger(__name__)
@@ -31,9 +32,11 @@ def parse_peer(self, peer):
3132

3233
def enable_server(self, listen_url):
3334
service = self.parse_peer(listen_url)
34-
listener = self.loop.create_server(
35-
lambda: BasicProtocol(self.receptor, self.loop),
36-
service.hostname, service.port,
35+
cb = functools.partial(sock.serve, factory=factory)
36+
listener = asyncio.start_server(
37+
client_connected_cb,
38+
host=service.hostname,
39+
port=service.port,
3740
ssl=self.receptor.config.get_server_ssl_context())
3841
logger.info("Serving on {}:{}".format(service.hostname, service.port))
3942
self.loop.create_task(listener)

0 commit comments

Comments
 (0)