Skip to content

Commit 7fcd706

Browse files
committed
handling edge cases in consume
1. too much data 2. not enough data For #1 we make sure we read no more than `to_read` in any given consume operation. For #2 we make sure we have enough bytes on hand to unpack into a Frame, buffering if we don't. Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 77a6faf commit 7fcd706

8 files changed

Lines changed: 89 additions & 102 deletions

File tree

receptor/messages/directive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
2-
import logging
32
import json
3+
import logging
44

55
from ..exceptions import UnknownDirective
66
from . import envelope
@@ -26,7 +26,7 @@ async def __call__(self, router, inner_env):
2626
serial = 0
2727
async for response in responses:
2828
serial += 1
29-
enveloped_response = envelope.InnerEnvelope.make_response(
29+
enveloped_response = envelope.Inner.make_response(
3030
receptor=router.receptor,
3131
recipient=inner_env.sender,
3232
payload=response,

receptor/messages/envelope.py

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import asyncio
22
import base64
33
import datetime
4-
import io
5-
import itertools
64
import json
75
import logging
86
import struct
@@ -61,29 +59,42 @@ class FramedBuffer:
6159
def __init__(self, loop=None):
6260
self.q = asyncio.Queue(loop=loop)
6361
self.header = None
62+
self.framebuffer = bytearray()
6463
self.bb = bytearray()
6564
self.current_frame = None
6665
self.to_read = 0
6766

6867
async def put(self, data):
68+
logger.debug("put: %s ... %s", data[:16], data[-16:])
6969
if not self.to_read:
7070
return await self.handle_frame(data)
7171
await self.consume(data)
7272

7373
async def handle_frame(self, data):
74-
self.current_frame, rest = Frame.from_data(data)
74+
try:
75+
self.framebuffer += data
76+
frame, rest = Frame.from_data(self.framebuffer)
77+
except struct.error:
78+
return # We don't have enough data yet
79+
else:
80+
self.framebuffer = bytearray()
7581

76-
if self.current_frame.type not in Frame.Types:
82+
if frame.type not in Frame.Types:
7783
raise Exception("Unknown Frame Type")
7884

85+
self.current_frame = frame
7986
self.to_read = self.current_frame.length
8087
await self.consume(rest)
8188

8289
async def consume(self, data):
90+
logger.debug("consuming %d bytes; to_read = %d bytes", len(data), self.to_read)
91+
data, rest = data[:self.to_read], data[self.to_read:]
8392
self.to_read -= len(data)
8493
self.bb += data
8594
if self.to_read == 0:
8695
await self.finish()
96+
if rest:
97+
await self.handle_frame(rest)
8798

8899
async def finish(self):
89100
if self.current_frame.type == Frame.Types.HEADER:
@@ -124,6 +135,9 @@ def __init__(self, type_, version, length, msg_id, id_):
124135
self.msg_id = msg_id
125136
self.id = id_
126137

138+
def __repr__(self):
139+
return f"Frame({self.type}, {self.version}, {self.length}, {self.msg_id}, {self.id})"
140+
127141
def serialize(self):
128142
return self.fmt.pack(
129143
bytes([self.type]), bytes([self.version]),
@@ -158,58 +172,6 @@ def join_uuid(hi, lo):
158172
return (hi << 64) | lo
159173

160174

161-
def glue(header, payload):
162-
hf = Frame.wrap(header, type_=Frame.Types.HEADER)
163-
pf = Frame.wrap(payload, msg_id=hf.msg_id)
164-
return hf.serialize() + header + pf.serialize() + payload
165-
166-
167-
def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8):
168-
if msg_id is None:
169-
msg_id = uuid.uuid4().int
170-
seq = itertools.count()
171-
buf = bytearray(chunksize)
172-
bv = memoryview(buf)
173-
header = json.dumps(header).encode("utf-8")
174-
yield Frame(Frame.Types.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header
175-
yield Frame(Frame.Types.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize()
176-
buffer = io.BytesIO(data)
177-
bytes_read = buffer.readinto(buf)
178-
while bytes_read:
179-
if bytes_read == chunksize:
180-
yield bv.tobytes()
181-
else:
182-
yield bv[:bytes_read].tobytes()
183-
bytes_read = buffer.readinto(buf)
184-
185-
186-
class OuterEnvelope:
187-
def __init__(self, frame_id, sender, recipient, route_list, inner):
188-
self.frame_id = frame_id
189-
self.sender = sender
190-
self.recipient = recipient
191-
self.route_list = route_list
192-
self.inner = inner
193-
self.inner_obj = None
194-
195-
async def deserialize_inner(self, receptor):
196-
self.inner_obj = await Inner.deserialize(receptor, self.inner)
197-
198-
@classmethod
199-
def from_raw(cls, raw):
200-
doc = json.loads(raw)
201-
return cls(**doc)
202-
203-
def serialize(self):
204-
return json.dumps(dict(
205-
frame_id=self.frame_id,
206-
sender=self.sender,
207-
recipient=self.recipient,
208-
route_list=self.route_list,
209-
inner=self.inner
210-
))
211-
212-
213175
class Inner:
214176
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
215177
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,

receptor/protocol.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ async def watch_queue(self):
5757
continue
5858

5959
try:
60-
self.transport.write(msg + DELIM)
60+
logger.debug("about to write %s ... %s", msg[:8], msg[-8:])
61+
self.transport.write(msg)
62+
logger.debug("written successfully")
6163
except Exception:
6264
logger.exception("Error received trying to write to %s", self.id)
6365
await buffer_obj.put(msg)
@@ -76,7 +78,7 @@ def connection_lost(self, exc):
7678
self.receptor.remove_connection(self)
7779

7880
def data_received(self, data):
79-
logger.debug(data)
81+
logger.debug("recv: %s ... %s", data[:16], data[-16:])
8082
self.loop.create_task(self.incoming_buffer.put(data))
8183

8284
async def wait_greeting(self):
@@ -95,10 +97,12 @@ async def wait_greeting(self):
9597
self.transport.close()
9698

9799
def handle_handshake(self, data):
100+
logger.debug("handle_handshake: %s", data)
98101
self.id = data["id"]
99102
self.meta = data.get("meta", {})
100103
self.receptor.add_connection(self)
101104
self.loop.create_task(self.watch_queue())
105+
logger.debug("starting message_handler: %s", self.incoming_buffer)
102106
self.loop.create_task(self.receptor.message_handler(self.incoming_buffer))
103107

104108
def send_handshake(self):
@@ -180,14 +184,15 @@ def emit_response(self, response):
180184

181185
def _do_emit_callback(self, fut):
182186
res = fut.result()
183-
self.transport.write(res.encode() + DELIM)
187+
logger.debug("_do_emit_callback: %s", res)
188+
self.transport.write(res + DELIM)
184189

185190
def data_received(self, data):
186191
recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2)
187192
message_id = str(uuid.uuid4())
188193
logger.info(f'{message_id}: Sending {directive} to {recipient}')
189194
sent_timestamp = datetime.datetime.utcnow()
190-
inner_env = envelope.InnerEnvelope(
195+
inner_env = envelope.Inner(
191196
receptor=self.receptor,
192197
message_id=message_id,
193198
sender=self.receptor.node_id,
@@ -212,7 +217,7 @@ def _data_received_callback(self, inner_env, fut):
212217
try:
213218
fut.result()
214219
except Exception as e:
215-
err_resp = envelope.InnerEnvelope.make_response(
220+
err_resp = envelope.Inner.make_response(
216221
receptor=self.receptor,
217222
recipient=inner_env.sender,
218223
payload=str(e),

receptor/receptor.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,18 @@ def update_connections(self, protocol_obj):
9393
self.update_connection_manifest(protocol_obj.id)
9494

9595
async def message_handler(self, buf):
96+
logger.debug("spawning message_handler")
9697
while True:
97-
header, data = await buf.get()
98-
if "cmd" in data and data["cmd"] == "ROUTE":
99-
await self.handle_route_advertisement(data)
98+
try:
99+
data = await buf.get()
100+
except Exception:
101+
logger.exception("message_handler")
100102
else:
101-
await self.handle_message(header, data)
103+
logger.debug("message_handler: %s", data)
104+
if "cmd" in data.header and data.header["cmd"] == "ROUTE":
105+
await self.handle_route_advertisement(data.header)
106+
else:
107+
await self.handle_message(data)
102108

103109
def add_connection(self, protocol_obj):
104110
self.update_connections(protocol_obj)
@@ -191,17 +197,17 @@ async def handle_response(self, inner):
191197
else:
192198
logger.warning(f'Received response to {in_response_to} but no record of sent message.')
193199

194-
async def handle_message(self, header, msg):
200+
async def handle_message(self, msg):
195201
handlers = dict(
196202
directive=self.handle_directive,
197203
response=self.handle_response,
198204
)
199205
messages_received_counter.inc()
200-
next_hop = self.router.next_hop(header.recipient)
206+
next_hop = self.router.next_hop(msg.header["recipient"])
201207
if next_hop:
202-
return await self.router.forward(header, msg, next_hop)
208+
return await self.router.forward(msg, next_hop)
203209

204-
inner = await envelope.Inner.deserialize(self, msg)
210+
inner = await envelope.Inner.deserialize(self, msg.payload)
205211

206212
if inner.message_type not in handlers:
207213
raise exceptions.UnknownMessageType(

receptor/router.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def get_nodes(self):
9797
async def ping_node(self, node_id, callback=log_ping):
9898
logger.info(f'Sending ping to node {node_id}')
9999
now = datetime.datetime.utcnow().isoformat()
100-
ping_envelope = envelope.InnerEnvelope(
100+
ping_envelope = envelope.Inner(
101101
receptor=self.receptor,
102102
message_id=str(uuid.uuid4()),
103103
sender=self.node_id,
@@ -137,17 +137,17 @@ def find_shortest_path(self, to_node_id):
137137
mins[next_vertex] = next_total_cost
138138
heapq.heappush(heap, (next_total_cost, next_vertex, path))
139139

140-
async def forward(self, outer_envelope, next_hop):
140+
async def forward(self, msg, next_hop):
141141
"""
142142
Forward a message on to the next hop closer to its destination
143143
"""
144144
buffer_mgr = self.receptor.config.components_buffer_manager
145145
buffer_obj = buffer_mgr.get_buffer_for_node(next_hop, self.receptor)
146-
outer_envelope.route_list.append(self.node_id)
147-
logger.debug(f'Forwarding frame {outer_envelope.frame_id} to {next_hop}')
146+
msg.header["route_list"].append(self.node_id)
147+
logger.debug(f'Forwarding frame {msg.msg_id} to {next_hop}')
148148
try:
149149
route_counter.inc()
150-
await buffer_obj.put(outer_envelope.serialize().encode("utf-8"))
150+
await buffer_obj.put(msg.serialize())
151151
except ReceptorBufferError as e:
152152
logger.exception("Receptor Buffer Write Error forwarding message to {}: {}".format(next_hop, e))
153153
# TODO: Possible to find another route? This might be a hard failure
@@ -183,7 +183,7 @@ async def send(self, inner_envelope, expected_response=False):
183183
"recipient": inner_envelope.recipient,
184184
"route_list": [self.node_id]
185185
}
186-
msg = envelope.FramedMessage(uuid.uuid4().int, header, signed)
186+
msg = envelope.FramedMessage(msg_id=uuid.uuid4().int, header=header, payload=signed)
187187
logger.debug(f'Sending {inner_envelope.message_id} to {inner_envelope.recipient} via {next_node_id}')
188188
if expected_response and inner_envelope.message_type == 'directive':
189189
self.response_registry[inner_envelope.message_id] = dict(message_sent_time=inner_envelope.timestamp)

receptor/security/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
34
logger = logging.getLogger(__name__)
45

56

@@ -26,5 +27,4 @@ async def sign_response(self, inner_envelope):
2627
for attr in ['message_id', 'sender', 'recipient', 'message_type',
2728
'timestamp', 'raw_payload', 'directive',
2829
'in_response_to', 'ttl', 'serial', 'code']}
29-
)
30-
30+
).encode("utf-8")

receptor/tests/test_framedbuffer.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55

6-
from receptor.messages.envelope import Frame, FramedBuffer, gen_chunks
6+
from receptor.messages.envelope import Frame, FramedBuffer, FramedMessage
77

88

99
@pytest.yield_fixture
@@ -12,8 +12,8 @@ def msg_id():
1212

1313

1414
@pytest.yield_fixture
15-
def framed_buffer():
16-
return FramedBuffer()
15+
def framed_buffer(event_loop):
16+
return FramedBuffer(loop=event_loop)
1717

1818

1919
@pytest.mark.asyncio
@@ -37,21 +37,6 @@ async def test_framedbuffer(framed_buffer, msg_id):
3737
assert m.payload == payload + payload2
3838

3939

40-
@pytest.mark.asyncio
41-
async def test_gen_chunks():
42-
43-
b = FramedBuffer()
44-
45-
header = {"sender": "node1", "recipient": "node2", "route_list": []}
46-
payload = b"this is a test with a buffer"
47-
for chunk in gen_chunks(payload, header):
48-
await b.put(chunk)
49-
50-
m = await b.get()
51-
assert m.header == header
52-
assert m.payload == payload
53-
54-
5540
@pytest.mark.asyncio
5641
async def test_hi(msg_id, framed_buffer):
5742
hi = json.dumps({"cmd": "hi"}).encode("utf-8")
@@ -94,3 +79,33 @@ async def test_command(framed_buffer, msg_id):
9479
m = await framed_buffer.get()
9580
assert m.header == cmd
9681
assert m.payload is None
82+
83+
84+
@pytest.mark.asyncio
85+
async def test_overfull(framed_buffer, msg_id):
86+
header = {"foo": "bar"}
87+
payload = b'this is a test'
88+
msg = FramedMessage(header=header, payload=payload)
89+
90+
await framed_buffer.put(msg.serialize())
91+
92+
m = await framed_buffer.get()
93+
94+
assert m.header == header
95+
assert m.payload == payload
96+
97+
98+
@pytest.mark.asyncio
99+
async def test_underfull(framed_buffer, msg_id):
100+
header = {"foo": "bar"}
101+
payload = b'this is a test'
102+
msg = FramedMessage(header=header, payload=payload)
103+
b = msg.serialize()
104+
105+
await framed_buffer.put(b[:10])
106+
await framed_buffer.put(b[10:])
107+
108+
m = await framed_buffer.get()
109+
110+
assert m.header == header
111+
assert m.payload == payload

0 commit comments

Comments
 (0)