Skip to content

Commit f64096c

Browse files
authored
Merge pull request #51 from jhjaggars/envelope-rework
Envelope rework
2 parents b1f1ae0 + 3e52ec7 commit f64096c

11 files changed

Lines changed: 403 additions & 163 deletions

File tree

Pipfile.lock

Lines changed: 17 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receptor/buffers/file.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ def __init__(self, dir_, key, loop):
2727
pass
2828
for item in self._read_manifest():
2929
self.q.put_nowait(item)
30-
30+
3131
async def put(self, data):
3232
ident = str(uuid.uuid4())
3333
await self._loop.run_in_executor(pool, self._write_file, data, ident)
3434
await self.q.put(ident)
3535
await self._save_manifest()
36-
36+
3737
async def get(self, handle_only=False, delete=True):
3838
while True:
3939
msg = await self.q.get()
@@ -42,21 +42,25 @@ async def get(self, handle_only=False, delete=True):
4242
return await self._get_file(msg, handle_only=handle_only, delete=delete)
4343
except FileNotFoundError:
4444
pass
45-
45+
4646
async def _save_manifest(self):
4747
async with self._manifest_lock:
4848
await self._loop.run_in_executor(pool, self._write_manifest)
49-
49+
5050
def _write_manifest(self):
5151
with open(self._manifest_path, "w") as fp:
5252
json.dump(list(self.q._queue), fp)
53-
53+
5454
def _read_manifest(self):
5555
try:
5656
with open(self._manifest_path, "r") as fp:
5757
return json.load(fp)
5858
except FileNotFoundError:
5959
return []
60+
except json.decoder.JSONDecodeError:
61+
with open(self._manifest_path, "r") as fp:
62+
logger.error("failed to decode manifest: %s", fp.read())
63+
raise
6064

6165
def _path_for_ident(self, ident):
6266
return os.path.join(self._message_path, ident)

receptor/messages/directive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
2-
import logging
32
import json
3+
import logging
44

55
from ..exceptions import UnknownDirective
66
from . import envelope
@@ -26,7 +26,7 @@ async def __call__(self, router, inner_env):
2626
serial = 0
2727
async for response in responses:
2828
serial += 1
29-
enveloped_response = envelope.InnerEnvelope.make_response(
29+
enveloped_response = envelope.Inner.make_response(
3030
receptor=router.receptor,
3131
recipient=inner_env.sender,
3232
payload=response,

receptor/messages/envelope.py

Lines changed: 170 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,187 @@
1+
import asyncio
12
import base64
23
import datetime
34
import json
45
import logging
5-
import uuid
6+
import struct
67
import time
8+
import uuid
9+
from enum import IntEnum
710

811
logger = logging.getLogger(__name__)
912

13+
MAX_INT64 = (2 ** 64 - 1)
1014

11-
class OuterEnvelope:
12-
def __init__(self, frame_id, sender, recipient, route_list, inner):
13-
self.frame_id = frame_id
14-
self.sender = sender
15-
self.recipient = recipient
16-
self.route_list = route_list
17-
self.inner = inner
18-
self.inner_obj = None
1915

20-
async def deserialize_inner(self, receptor):
21-
self.inner_obj = await InnerEnvelope.deserialize(receptor, self.inner)
16+
class FramedMessage:
17+
"""
18+
A complete, two-part message.
19+
"""
20+
21+
__slots__ = ("msg_id", "header", "payload")
22+
23+
def __init__(self, msg_id=None, header=None, payload=None):
24+
if msg_id is None:
25+
msg_id = uuid.uuid4().int
26+
self.msg_id = msg_id
27+
self.header = header
28+
self.payload = payload
29+
30+
def serialize(self):
31+
h = json.dumps(self.header).encode("utf-8")
32+
return b''.join([
33+
Frame.wrap(h, type_=Frame.Types.HEADER, msg_id=self.msg_id).serialize(),
34+
h,
35+
Frame.wrap(self.payload, msg_id=self.msg_id).serialize(),
36+
self.payload
37+
])
38+
39+
40+
class CommandMessage(FramedMessage):
41+
"""
42+
A complete, single part message, meant to encapsulate point to point
43+
commands or naive broadcasts.
44+
"""
45+
46+
def serialize(self):
47+
h = json.dumps(self.header).encode("utf-8")
48+
return b''.join([
49+
Frame.wrap(h, type_=Frame.Types.COMMAND, msg_id=self.msg_id).serialize(),
50+
h,
51+
])
52+
53+
54+
class FramedBuffer:
55+
"""
56+
A buffer that accumulates frames and bytes to produce a header and a
57+
payload.
58+
59+
This buffer assumes that an entire message (denoted by msg_id) will be
60+
sent before another message is sent.
61+
"""
62+
def __init__(self, loop=None):
63+
self.q = asyncio.Queue(loop=loop)
64+
self.header = None
65+
self.framebuffer = bytearray()
66+
self.bb = bytearray()
67+
self.current_frame = None
68+
self.to_read = 0
69+
70+
async def put(self, data):
71+
if not self.to_read:
72+
return await self.handle_frame(data)
73+
await self.consume(data)
74+
75+
async def handle_frame(self, data):
76+
try:
77+
self.framebuffer += data
78+
frame, rest = Frame.from_data(self.framebuffer)
79+
except struct.error:
80+
return # We don't have enough data yet
81+
else:
82+
self.framebuffer = bytearray()
83+
84+
if frame.type not in Frame.Types:
85+
raise Exception("Unknown Frame Type")
86+
87+
self.current_frame = frame
88+
self.to_read = self.current_frame.length
89+
await self.consume(rest)
90+
91+
async def consume(self, data):
92+
logger.debug("consuming %d bytes; to_read = %d bytes", len(data), self.to_read)
93+
data, rest = data[:self.to_read], data[self.to_read:]
94+
self.to_read -= len(data)
95+
self.bb += data
96+
if self.to_read == 0:
97+
await self.finish()
98+
if rest:
99+
await self.handle_frame(rest)
100+
101+
async def finish(self):
102+
if self.current_frame.type == Frame.Types.HEADER:
103+
self.header = json.loads(self.bb)
104+
elif self.current_frame.type == Frame.Types.PAYLOAD:
105+
await self.q.put(FramedMessage(
106+
self.current_frame.msg_id, header=self.header,
107+
payload=self.bb))
108+
self.header = None
109+
elif self.current_frame.type == Frame.Types.COMMAND:
110+
await self.q.put(FramedMessage(
111+
self.current_frame.msg_id, header=json.loads(self.bb)))
112+
else:
113+
raise Exception("Unknown Frame Type")
114+
self.to_read = 0
115+
self.bb = bytearray()
116+
117+
async def get(self):
118+
return await self.q.get()
119+
120+
121+
class Frame:
122+
"""
123+
A Frame represents the minimal metadata about a transmission.
124+
125+
Usually you should not create one directly, but rather use the
126+
FramedMessage or CommandMessage classes.
127+
"""
128+
129+
class Types(IntEnum):
130+
HEADER = 0
131+
PAYLOAD = 1
132+
COMMAND = 2
133+
134+
fmt = struct.Struct(">ccIIQQ")
135+
136+
__slots__ = ('type', 'version', 'length', 'msg_id', 'id')
137+
138+
def __init__(self, type_, version, length, msg_id, id_):
139+
self.type = type_
140+
self.version = version
141+
self.length = length
142+
self.msg_id = msg_id
143+
self.id = id_
144+
145+
def __repr__(self):
146+
return f"Frame({self.type}, {self.version}, {self.length}, {self.msg_id}, {self.id})"
22147

23-
@classmethod
24-
def from_raw(cls, raw):
25-
doc = json.loads(raw)
26-
return cls(**doc)
27-
28148
def serialize(self):
29-
return json.dumps(dict(
30-
frame_id=self.frame_id,
31-
sender=self.sender,
32-
recipient=self.recipient,
33-
route_list=self.route_list,
34-
inner=self.inner
35-
))
149+
return self.fmt.pack(
150+
bytes([self.type]), bytes([self.version]),
151+
self.id, self.length, *split_uuid(self.msg_id))
152+
153+
@classmethod
154+
def deserialize(cls, buf):
155+
t, v, i, length, hi, lo = Frame.fmt.unpack(buf)
156+
msg_id = join_uuid(hi, lo)
157+
return cls(Frame.Types(ord(t)), ord(v), length, msg_id, i)
158+
159+
@classmethod
160+
def from_data(cls, data):
161+
return cls.deserialize(data[:Frame.fmt.size]), data[Frame.fmt.size:]
162+
163+
@classmethod
164+
def wrap(cls, data, type_=Types.PAYLOAD, msg_id=None):
165+
"""
166+
Returns a frame for the passed data.
167+
"""
168+
if not msg_id:
169+
msg_id = uuid.uuid4().int
170+
171+
return cls(type_, 1, len(data), msg_id, 1)
172+
173+
174+
def split_uuid(data):
175+
"Splits a 128 bit int into two 64 bit words for binary encoding"
176+
return ((data >> 64) & MAX_INT64, data & MAX_INT64)
177+
178+
179+
def join_uuid(hi, lo):
180+
"Joins two 64 bit words into a 128bit int from binary encoding"
181+
return (hi << 64) | lo
36182

37183

38-
class InnerEnvelope:
184+
class Inner:
39185
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
40186
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,
41187
code=0, expire_time_delta=300):

0 commit comments

Comments
 (0)