Skip to content

Commit fa0c54d

Browse files
committed
refactoring connection into a package
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent b2f8822 commit fa0c54d

4 files changed

Lines changed: 88 additions & 57 deletions

File tree

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import logging
22

3-
import functools
43
import asyncio
5-
import aiohttp
6-
import aiohttp.web
74

8-
from .messages.envelope import FramedBuffer
5+
from ..messages.envelope import FramedBuffer
96

107
logger = logging.getLogger(__name__)
118

@@ -28,25 +25,6 @@ async def send(self, bytes_):
2825
raise NotImplementedError("subclasses should implement this")
2926

3027

31-
class WebSocket(Transport):
32-
def __init__(self, ws):
33-
self.ws = ws
34-
35-
async def __anext__(self):
36-
msg = await self.ws.__anext__()
37-
return msg.data
38-
39-
async def close(self):
40-
return await self.ws.close()
41-
42-
@property
43-
def closed(self):
44-
return self.ws.closed
45-
46-
async def send(self, bytes_):
47-
await self.ws.send_bytes(bytes_)
48-
49-
5028
async def watch_queue(conn, buf):
5129
while not conn.closed:
5230
try:
@@ -145,35 +123,3 @@ async def server(self, transport):
145123
await self.start_processing()
146124
finally:
147125
self.unregister()
148-
149-
150-
async def connect(uri, factory, loop=None):
151-
if not loop:
152-
loop = asyncio.get_event_loop()
153-
154-
worker = factory()
155-
try:
156-
async with aiohttp.ClientSession().ws_connect(uri) as ws:
157-
t = WebSocket(ws)
158-
await worker.client(t)
159-
except Exception:
160-
logger.exception("connect")
161-
finally:
162-
await asyncio.sleep(5)
163-
logger.debug("reconnecting")
164-
loop.create_task(connect(uri, factory=factory, loop=loop))
165-
166-
167-
async def serve(request, factory):
168-
ws = aiohttp.web.WebSocketResponse()
169-
await ws.prepare(request)
170-
171-
t = WebSocket(ws)
172-
await factory().server(t)
173-
174-
175-
def app(factory):
176-
handler = functools.partial(serve, factory=factory)
177-
app = aiohttp.web.Application()
178-
app.add_routes([aiohttp.web.get("/", handler)])
179-
return app

receptor/connection/sock.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from . import Transport
2+
3+
4+
class RawSocket(Transport):
5+
def __init__(self, reader, writer):
6+
self.reader = reader
7+
self.writer = writer
8+
self._closed = False
9+
10+
async def __anext__(self):
11+
return await self.reader.read()
12+
13+
@property
14+
def closed(self):
15+
return self._closed
16+
17+
async def close(self):
18+
self._closed = True
19+
await self.writer.close()
20+
21+
async def send(self, bytes_):
22+
self.writer.write(bytes_)
23+
await self.writer.drain()

receptor/connection/ws.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import logging
2+
import functools
3+
import aiohttp
4+
import aiohttp.web
5+
import asyncio
6+
7+
from . import Transport
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class WebSocket(Transport):
13+
def __init__(self, ws):
14+
self.ws = ws
15+
16+
async def __anext__(self):
17+
msg = await self.ws.__anext__()
18+
return msg.data
19+
20+
async def close(self):
21+
return await self.ws.close()
22+
23+
@property
24+
def closed(self):
25+
return self.ws.closed
26+
27+
async def send(self, bytes_):
28+
await self.ws.send_bytes(bytes_)
29+
30+
31+
async def connect(uri, factory, loop=None):
32+
if not loop:
33+
loop = asyncio.get_event_loop()
34+
35+
worker = factory()
36+
try:
37+
async with aiohttp.ClientSession().ws_connect(uri) as ws:
38+
t = WebSocket(ws)
39+
await worker.client(t)
40+
except Exception:
41+
logger.exception("connect")
42+
finally:
43+
await asyncio.sleep(5)
44+
logger.debug("reconnecting")
45+
loop.create_task(connect(uri, factory=factory, loop=loop))
46+
47+
48+
async def serve(request, factory):
49+
ws = aiohttp.web.WebSocketResponse()
50+
await ws.prepare(request)
51+
52+
t = WebSocket(ws)
53+
await factory().server(t)
54+
55+
56+
def app(factory):
57+
handler = functools.partial(serve, factory=factory)
58+
app = aiohttp.web.Application()
59+
app.add_routes([aiohttp.web.get("/", handler)])
60+
return app

receptor/controller.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from .receptor import Receptor
99
from .messages import envelope
1010
from . import connection
11+
from .connection import ws, Worker
12+
from . import protocol
1113

1214
logger = logging.getLogger(__name__)
1315

@@ -38,9 +40,9 @@ def enable_server(self, listen_url):
3840

3941
def enable_websocket_server(self, listen_url):
4042
service = urlparse(listen_url)
41-
factory = lambda: connection.Worker(receptor, loop)
43+
factory = lambda: Worker(receptor, loop)
4244
listener = self.loop.create_server(
43-
connection.app(factory).make_handler(),
45+
ws.app(factory).make_handler(),
4446
service.hostname, service.port,
4547
ssl=self.receptor.config.get_server_ssl_context())
4648
logger.info("Serving websockets on {}:{}".format(service.hostname, service.port))

0 commit comments

Comments
 (0)