Skip to content

Commit c235058

Browse files
committed
incoming buffer is now async
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 8b3cfb3 commit c235058

3 files changed

Lines changed: 41 additions & 48 deletions

File tree

receptor/protocol.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
import time
77
import uuid
88

9-
from collections import deque
10-
11-
from .messages import envelope
129
from .exceptions import ReceptorBufferError
10+
from .messages import envelope
1311
from .stats import connected_peers_guage
1412

1513
logger = logging.getLogger(__name__)
@@ -19,19 +17,19 @@
1917

2018

2119
class DataBuffer:
22-
def __init__(self, deserializer=json.loads):
23-
self.q = deque()
20+
def __init__(self, loop=None, deserializer=json.loads):
21+
self.q = asyncio.Queue(loop=loop)
2422
self.data_buffer = b""
2523
self.deserializer = deserializer
2624

2725
def add(self, data):
2826
self.data_buffer = self.data_buffer + data
2927
*ready, self.data_buffer = self.data_buffer.rsplit(DELIM)
30-
self.q.extend(ready)
28+
for chunk in ready:
29+
self.q.put_nowait(chunk)
3130

32-
def get(self):
33-
while self.q:
34-
yield self.deserializer(self.q.popleft())
31+
async def get(self):
32+
return self.deserializer(await self.q.get())
3533

3634

3735
class BaseProtocol(asyncio.Protocol):
@@ -74,8 +72,8 @@ def connection_made(self, transport):
7472
self.peername = transport.get_extra_info('peername')
7573
self.transport = transport
7674
self.greeted = False
77-
self.incoming_buffer = DataBuffer()
7875
connected_peers_guage.inc()
76+
self.incoming_buffer = DataBuffer(loop=self.loop)
7977
self.loop.create_task(self.wait_greeting())
8078

8179
def connection_lost(self, exc):
@@ -91,18 +89,15 @@ async def wait_greeting(self):
9189
Initialized when the connection is established to handle the greeting
9290
before transitioning to message processing.
9391
'''
94-
while not self.greeted:
95-
logger.debug('Looking for handshake...')
96-
for data in self.incoming_buffer.get():
97-
logger.debug(data)
98-
if data["cmd"] == "HI":
99-
self.handle_handshake(data)
100-
break
101-
else:
102-
logger.error("Handshake failed!")
103-
self.transport.close()
104-
await asyncio.sleep(.1)
105-
logger.debug("handshake complete, starting normal handle loop")
92+
logger.debug('Looking for handshake...')
93+
data = await self.incoming_buffer.get()
94+
logger.debug(data)
95+
if data["cmd"] == "HI":
96+
self.handle_handshake(data)
97+
logger.debug("handshake complete, starting normal handle loop")
98+
else:
99+
logger.error("Handshake failed!")
100+
self.transport.close()
106101

107102
def handle_handshake(self, data):
108103
self.greeted = True

receptor/receptor.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
import os
2-
import json
3-
import uuid
4-
import time
51
import asyncio
6-
import logging
72
import copy
3+
import json
4+
import logging
5+
import os
6+
import time
7+
import uuid
88

9+
from . import exceptions
10+
from .messages import directive, envelope
911
from .router import MeshRouter
10-
from .work import WorkManager
11-
from .messages import envelope, directive
1212
from .stats import messages_received_counter
13-
from . import exceptions
13+
from .work import WorkManager
1414

1515
RECEPTOR_DIRECTIVE_NAMESPACE = 'receptor'
1616
logger = logging.getLogger(__name__)
@@ -99,12 +99,11 @@ def update_connections(self, protocol_obj):
9999

100100
async def message_handler(self, buf):
101101
while True:
102-
for data in buf.get():
103-
if "cmd" in data and data["cmd"] == "ROUTE":
104-
self.handle_route_advertisement(data)
105-
else:
106-
await self.handle_message(data)
107-
await asyncio.sleep(.1)
102+
data = await buf.get()
103+
if "cmd" in data and data["cmd"] == "ROUTE":
104+
self.handle_route_advertisement(data)
105+
else:
106+
await self.handle_message(data)
108107

109108
def add_connection(self, protocol_obj):
110109
self.update_connections(protocol_obj)

receptor/tests/test_protocol.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,34 @@
11
import pytest
2+
23
from receptor import protocol
34

45

56
def deser(x):
67
return x
78

89

9-
def test_databuffer():
10+
@pytest.mark.asyncio
11+
async def test_databuffer():
1012
b = protocol.DataBuffer(deserializer=deser)
1113
msg = b"this is a test"
1214
s = protocol.DELIM + msg + protocol.DELIM
1315
b.add(s)
14-
it = b.get()
15-
assert next(it) == b""
16-
assert next(it) == msg
16+
assert await b.get() == b""
17+
assert await b.get() == msg
1718

1819

1920
def test_databuffer_no_delim():
2021
b = protocol.DataBuffer(deserializer=deser)
2122
msg = b"this is a test"
2223
b.add(msg)
23-
with pytest.raises(StopIteration):
24-
next(b.get())
24+
assert b.q.empty()
2525

2626

27-
def test_databuffer_many_msgs():
27+
@pytest.mark.asyncio
28+
async def test_databuffer_many_msgs():
2829
b = protocol.DataBuffer(deserializer=deser)
2930
msg = [b"first bit", b"second bit", b"third bit unfinished"]
3031
b.add(protocol.DELIM.join(msg))
31-
it = b.get()
32-
assert next(it) == msg[0]
33-
assert next(it) == msg[1]
34-
with pytest.raises(StopIteration):
35-
next(it)
32+
assert msg[0] == await b.get()
33+
assert msg[1] == await b.get()
34+
assert b.q.empty()

0 commit comments

Comments
 (0)