Skip to content

Commit 57f1bbd

Browse files
committed
adding some websocket classes
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent d247942 commit 57f1bbd

1 file changed

Lines changed: 95 additions & 0 deletions

File tree

receptor/ws.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import json
2+
import logging
3+
import time
4+
5+
import aiohttp
6+
7+
from .protocol import DataBuffer
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
async def watch_queue(sock, buf):
13+
while sock.open:
14+
try:
15+
msg = await buf.get()
16+
except Exception:
17+
logger.exception("Error getting data from buffer")
18+
19+
try:
20+
sock.send(msg)
21+
except Exception:
22+
logger.exception("Error received trying to write")
23+
await buf.put(msg)
24+
return await sock.close()
25+
26+
27+
class WSClient:
28+
def __init__(self, receptor, loop):
29+
self.receptor = receptor
30+
self.loop = loop
31+
32+
async def connect(self, uri):
33+
async with aiohttp.ClientSession().ws_connect(uri) as sock:
34+
# handshake
35+
node_id = await self.handshake(sock)
36+
incoming_buffer = DataBuffer()
37+
self.loop.create_task(self.receive(sock, incoming_buffer)) # reader
38+
39+
buf = self.receptor.buffer_mgr.get_buffer_for_node(node_id, self.receptor)
40+
self.loop.create_task(watch_queue(sock, buf)) # writer
41+
42+
self.loop.create_task(self.connect(uri))
43+
44+
45+
async def handshake(self, sock):
46+
msg = json.dumps({
47+
"cmd": "HI",
48+
"id": self.receptor.node_id,
49+
"expire_time": time.time() + 10,
50+
"meta": {
51+
"capabilities": self.receptor.work_manager.get_capabilities(),
52+
"groups": self.receptor.config.node_groups,
53+
"work": self.receptor.work_manager.get_work(),
54+
}
55+
}).encode("utf-8")
56+
await sock.send_bytes(msg)
57+
response = await sock.receive().json()
58+
return response["id"]
59+
60+
async def receive(self, sock, buf):
61+
self.loop.create_task(self.receptor.message_handler(buf))
62+
async for msg in sock.receive():
63+
buf.add(msg.data)
64+
65+
66+
class WSServer:
67+
68+
def __init__(self, receptor, loop):
69+
self.receptor = receptor
70+
self.loop = loop
71+
72+
async def serve(self, request):
73+
74+
ws = aiohttp.web.WebSocketResponse()
75+
await ws.prepare(request)
76+
77+
handshake = await ws.receive().json()
78+
await ws.send_json({
79+
"cmd": "HI",
80+
"id": self.receptor.node_id,
81+
"expire_time": time.time() + 10,
82+
"meta": {
83+
"capabilities": self.receptor.work_manager.get_capabilities(),
84+
"groups": self.receptor.config.node_groups,
85+
"work": self.receptor.work_manager.get_work(),
86+
}
87+
})
88+
89+
buf = self.receptor.buffer_mgr.get_buffer_for_node(handshake["id"], self.receptor)
90+
self.loop.create_task(watch_queue(ws, buf)) # writer
91+
92+
incoming_buffer = DataBuffer()
93+
self.loop.create_task(self.receptor.message_handler(incoming_buffer))
94+
async for msg in ws:
95+
incoming_buffer.add(msg.data)

0 commit comments

Comments
 (0)