Skip to content

Commit 41e7159

Browse files
committed
using framed messages
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 7fb416f commit 41e7159

5 files changed

Lines changed: 164 additions & 104 deletions

File tree

receptor/messages/envelope.py

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,51 @@
55
import itertools
66
import json
77
import logging
8+
import struct
89
import time
910
import uuid
10-
from struct import pack, unpack
11+
from enum import IntEnum
1112

1213
logger = logging.getLogger(__name__)
1314

1415
MAX_INT64 = (2 ** 64 - 1)
1516

1617

18+
class FramedMessage:
19+
"""
20+
A complete message constructed from one or more Frames.
21+
"""
22+
23+
__slots__ = ("msg_id", "header", "payload")
24+
25+
def __init__(self, msg_id=None, header=None, payload=None):
26+
if msg_id is None:
27+
msg_id = uuid.uuid4().int
28+
self.msg_id = msg_id
29+
self.header = header
30+
self.payload = payload
31+
32+
33+
def serialize(self):
34+
h = json.dumps(self.header).encode("utf-8")
35+
return b''.join([
36+
Frame.wrap(h, type_=Frame.Types.HEADER, msg_id=self.msg_id).serialize(),
37+
h,
38+
Frame.wrap(self.payload, msg_id=self.msg_id).serialize(),
39+
self.payload
40+
])
41+
42+
43+
class CommandMessage(FramedMessage):
44+
45+
def serialize(self):
46+
h = json.dumps(self.header).encode("utf-8")
47+
return b''.join([
48+
Frame.wrap(h, type_=Frame.Types.COMMAND, msg_id=self.msg_id).serialize(),
49+
h,
50+
])
51+
52+
1753
class FramedBuffer:
1854
"""
1955
A buffer that accumulates frames and bytes to produce a header and a
@@ -33,11 +69,11 @@ async def put(self, data):
3369
if not self.to_read:
3470
return await self.handle_frame(data)
3571
await self.consume(data)
36-
72+
3773
async def handle_frame(self, data):
3874
self.current_frame, rest = Frame.from_data(data)
3975

40-
if self.current_frame.type not in (Frame.HEADER, Frame.PAYLOAD):
76+
if self.current_frame.type not in Frame.Types:
4177
raise Exception("Unknown Frame Type")
4278

4379
self.to_read = self.current_frame.length
@@ -50,11 +86,18 @@ async def consume(self, data):
5086
await self.finish()
5187

5288
async def finish(self):
53-
if self.current_frame.type == Frame.HEADER:
54-
self.header = Header.from_bytes(self.bb)
55-
elif self.current_frame.type == Frame.PAYLOAD:
56-
await self.q.put((self.header, self.bb))
89+
if self.current_frame.type == Frame.Types.HEADER:
90+
self.header = json.loads(self.bb)
91+
elif self.current_frame.type == Frame.Types.PAYLOAD:
92+
await self.q.put(FramedMessage(
93+
self.current_frame.msg_id, header=self.header,
94+
payload=self.bb))
5795
self.header = None
96+
elif self.current_frame.type == Frame.Types.COMMAND:
97+
await self.q.put(FramedMessage(
98+
self.current_frame.msg_id, header=json.loads(self.bb)))
99+
else:
100+
raise Exception("Unknown Frame Type")
58101
self.to_read = 0
59102
self.bb = bytearray()
60103

@@ -63,11 +106,17 @@ async def get(self):
63106

64107

65108
class Frame:
66-
HEADER = 0
67-
PAYLOAD = 1
109+
110+
class Types(IntEnum):
111+
HEADER = 0
112+
PAYLOAD = 1
113+
COMMAND = 2
114+
115+
fmt = struct.Struct(">ccIIQQ")
68116

69117
__slots__ = ('type', 'version', 'length', 'msg_id', 'id')
70118

119+
71120
def __init__(self, type_, version, length, msg_id, id_):
72121
self.type = type_
73122
self.version = version
@@ -76,17 +125,29 @@ def __init__(self, type_, version, length, msg_id, id_):
76125
self.id = id_
77126

78127
def serialize(self):
79-
return pack(">ccIIQQ", bytes([self.type]), bytes([self.version]), self.id, self.length, *split_uuid(self.msg_id))
128+
return self.fmt.pack(
129+
bytes([self.type]), bytes([self.version]),
130+
self.id, self.length, *split_uuid(self.msg_id))
80131

81132
@classmethod
82133
def deserialize(cls, buf):
83-
t, v, i, length, hi, lo = unpack(">ccIIQQ", buf)
134+
t, v, i, length, hi, lo = Frame.fmt.unpack(buf)
84135
msg_id = join_uuid(hi, lo)
85-
return cls(ord(t), ord(v), length, msg_id, i)
136+
return cls(Frame.Types(ord(t)), ord(v), length, msg_id, i)
86137

87138
@classmethod
88139
def from_data(cls, data):
89-
return cls.deserialize(data[:26]), data[26:]
140+
return cls.deserialize(data[:Frame.fmt.size]), data[Frame.fmt.size:]
141+
142+
@classmethod
143+
def wrap(cls, data, type_=Types.PAYLOAD, msg_id=None):
144+
"""
145+
Returns a frame for the passed data.
146+
"""
147+
if not msg_id:
148+
msg_id = uuid.uuid4().int
149+
150+
return cls(type_, 1, len(data), msg_id, 1)
90151

91152

92153
def split_uuid(data):
@@ -97,24 +158,10 @@ def join_uuid(hi, lo):
97158
return (hi << 64) | lo
98159

99160

100-
class Header:
101-
def __init__(self, sender, recipient, route_list):
102-
self.sender = sender
103-
self.recipient = recipient
104-
self.route_list = route_list
105-
106-
def serialize(self):
107-
return json.dumps({"sender": self.sender, "recipient": self.recipient, "route_list": self.route_list}).encode("utf-8")
108-
109-
@classmethod
110-
def from_bytes(cls, data):
111-
return cls(**json.loads(data))
112-
113-
def __repr__(self):
114-
return f"Header({self.sender}, {self.recipient}, {self.route_list})"
115-
116-
def __eq__(self, other):
117-
return (self.sender, self.recipient, self.route_list) == (other.sender, other.recipient, other.route_list)
161+
def glue(header, payload):
162+
hf = Frame.wrap(header, type_=Frame.Types.HEADER)
163+
pf = Frame.wrap(payload, msg_id=hf.msg_id)
164+
return hf.serialize() + header + pf.serialize() + payload
118165

119166

120167
def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8):
@@ -123,9 +170,9 @@ def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8):
123170
seq = itertools.count()
124171
buf = bytearray(chunksize)
125172
bv = memoryview(buf)
126-
header = header.serialize()
127-
yield Frame(Frame.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header
128-
yield Frame(Frame.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize()
173+
header = json.dumps(header).encode("utf-8")
174+
yield Frame(Frame.Types.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header
175+
yield Frame(Frame.Types.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize()
129176
buffer = io.BytesIO(data)
130177
bytes_read = buffer.readinto(buf)
131178
while bytes_read:
@@ -146,13 +193,13 @@ def __init__(self, frame_id, sender, recipient, route_list, inner):
146193
self.inner_obj = None
147194

148195
async def deserialize_inner(self, receptor):
149-
self.inner_obj = await InnerEnvelope.deserialize(receptor, self.inner)
196+
self.inner_obj = await Inner.deserialize(receptor, self.inner)
150197

151198
@classmethod
152199
def from_raw(cls, raw):
153200
doc = json.loads(raw)
154201
return cls(**doc)
155-
202+
156203
def serialize(self):
157204
return json.dumps(dict(
158205
frame_id=self.frame_id,
@@ -163,7 +210,7 @@ def serialize(self):
163210
))
164211

165212

166-
class InnerEnvelope:
213+
class Inner:
167214
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
168215
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,
169216
code=0, expire_time_delta=300):

receptor/protocol.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def connection_made(self, transport):
6868
self.peername = transport.get_extra_info('peername')
6969
self.transport = transport
7070
connected_peers_gauge.inc()
71-
self.incoming_buffer = DataBuffer(loop=self.loop)
71+
self.incoming_buffer = envelope.FramedBuffer(loop=self.loop)
7272
self.loop.create_task(self.wait_greeting())
7373

7474
def connection_lost(self, exc):
@@ -77,7 +77,7 @@ def connection_lost(self, exc):
7777

7878
def data_received(self, data):
7979
logger.debug(data)
80-
self.incoming_buffer.add(data)
80+
self.loop.create_task(self.incoming_buffer.put(data))
8181

8282
async def wait_greeting(self):
8383
'''
@@ -86,9 +86,9 @@ async def wait_greeting(self):
8686
'''
8787
logger.debug('Looking for handshake...')
8888
data = await self.incoming_buffer.get()
89-
logger.debug(data)
90-
if data["cmd"] == "HI":
91-
self.handle_handshake(data)
89+
logger.debug(data.header)
90+
if data.header["cmd"] == "HI":
91+
self.handle_handshake(data.header)
9292
logger.debug("handshake complete, starting normal handle loop")
9393
else:
9494
logger.error("Handshake failed!")
@@ -102,15 +102,15 @@ def handle_handshake(self, data):
102102
self.loop.create_task(self.receptor.message_handler(self.incoming_buffer))
103103

104104
def send_handshake(self):
105-
msg = json.dumps({
105+
msg = envelope.CommandMessage(header={
106106
"cmd": "HI",
107107
"id": self.receptor.node_id,
108108
"expire_time": time.time() + 10,
109109
"meta": dict(capabilities=self.receptor.work_manager.get_capabilities(),
110110
groups=self.receptor.config.node_groups,
111111
work=self.receptor.work_manager.get_work())
112-
}).encode("utf-8")
113-
self.transport.write(msg + DELIM)
112+
})
113+
self.transport.write(msg.serialize())
114114

115115

116116
class BasicProtocol(BaseProtocol):

receptor/receptor.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ def update_connections(self, protocol_obj):
9494

9595
async def message_handler(self, buf):
9696
while True:
97-
data = await buf.get()
97+
header, data = await buf.get()
9898
if "cmd" in data and data["cmd"] == "ROUTE":
9999
await self.handle_route_advertisement(data)
100100
else:
101-
await self.handle_message(data)
101+
await self.handle_message(header, data)
102102

103103
def add_connection(self, protocol_obj):
104104
self.update_connections(protocol_obj)
@@ -137,74 +137,74 @@ async def send_route_advertisement(self, edges=None, seen=[]):
137137
for target in destinations:
138138
buf = self.buffer_mgr.get_buffer_for_node(target, self)
139139
try:
140-
await buf.put(json.dumps({
140+
msg = envelope.CommandMessage(header={
141141
"cmd": "ROUTE",
142142
"id": self.node_id,
143143
"capabilities": self.work_manager.get_capabilities(),
144144
"groups": self.config.node_groups,
145145
"edges": edges,
146146
"seen": seens
147-
}).encode("utf-8"))
147+
})
148+
await buf.put(msg.serialize())
148149
except Exception as e:
149150
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
150151

151-
async def handle_directive(self, outer_env):
152+
async def handle_directive(self, inner):
152153
try:
153-
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
154+
namespace, _ = inner.directive.split(':', 1)
154155
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
155-
await directive.control(self.router, outer_env.inner_obj)
156+
await directive.control(self.router, inner)
156157
else:
157158
# other namespace/work directives
158-
await self.work_manager.handle(outer_env.inner_obj)
159+
await self.work_manager.handle(inner)
159160
except ValueError:
160-
logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (outer_env.inner_obj.directive,))
161-
err_resp = outer_env.inner_obj.make_response(
161+
logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (inner.directive,))
162+
err_resp = inner.make_response(
162163
receptor=self,
163-
recipient=outer_env.inner_obj.sender,
164-
payload="An invalid directive ('%s') was specified." % (outer_env.inner_obj.directive,),
165-
in_response_to=outer_env.inner_obj.message_id,
166-
serial=outer_env.inner_obj.serial + 1,
164+
recipient=inner.sender,
165+
payload="An invalid directive ('%s') was specified." % (inner.directive,),
166+
in_response_to=inner.message_id,
167+
serial=inner.serial + 1,
167168
ttl=15,
168169
code=1,
169170
)
170171
await self.router.send(err_resp)
171172
except Exception as e:
172173
logger.error("error in handle_message: '%s'. Sending failure response back." % (str(e),))
173-
err_resp = outer_env.inner_obj.make_response(
174+
err_resp = inner.make_response(
174175
receptor=self,
175-
recipient=outer_env.inner_obj.sender,
176+
recipient=inner.sender,
176177
payload=str(e),
177-
in_response_to=outer_env.inner_obj.message_id,
178-
serial=outer_env.inner_obj.serial + 1,
178+
in_response_to=inner.message_id,
179+
serial=inner.serial + 1,
179180
ttl=15,
180181
code=1,
181182
)
182183
await self.router.send(err_resp)
183184

184-
async def handle_response(self, outer_env):
185-
in_response_to = outer_env.inner_obj.in_response_to
185+
async def handle_response(self, inner):
186+
in_response_to = inner.in_response_to
186187
if in_response_to in self.router.response_registry:
187188
logger.info(f'Handling response to {in_response_to} with callback.')
188189
for connection in self.controller_connections:
189-
connection.emit_response(outer_env.inner_obj)
190+
connection.emit_response(inner)
190191
else:
191192
logger.warning(f'Received response to {in_response_to} but no record of sent message.')
192193

193-
async def handle_message(self, msg):
194+
async def handle_message(self, header, msg):
194195
handlers = dict(
195196
directive=self.handle_directive,
196197
response=self.handle_response,
197198
)
198199
messages_received_counter.inc()
199-
outer_env = envelope.OuterEnvelope(**msg)
200-
next_hop = self.router.next_hop(outer_env.recipient)
200+
next_hop = self.router.next_hop(header.recipient)
201201
if next_hop:
202-
return await self.router.forward(outer_env, next_hop)
202+
return await self.router.forward(header, msg, next_hop)
203203

204-
await outer_env.deserialize_inner(self)
204+
inner = await envelope.Inner.deserialize(self, msg)
205205

206-
if outer_env.inner_obj.message_type not in handlers:
206+
if inner.message_type not in handlers:
207207
raise exceptions.UnknownMessageType(
208-
f'Unknown message type: {outer_env.inner_obj.message_type}')
208+
f'Unknown message type: {inner.message_type}')
209209

210-
await handlers[outer_env.inner_obj.message_type](outer_env)
210+
await handlers[inner.message_type](inner)

receptor/router.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,14 @@ async def send(self, inner_envelope, expected_response=False):
177177
#TODO: This probably needs to emit an error response
178178
raise UnrouteableError(f'No route found to {inner_envelope.recipient}')
179179
signed = await inner_envelope.sign_and_serialize()
180-
outer_envelope = envelope.OuterEnvelope(
181-
frame_id=str(uuid.uuid4()),
182-
sender=self.node_id,
183-
recipient=inner_envelope.recipient,
184-
route_list=[self.node_id],
185-
inner=signed
186-
)
180+
181+
header = {
182+
"sender": self.node_id,
183+
"recipient": inner_envelope.recipient,
184+
"route_list": [self.node_id]
185+
}
186+
msg = envelope.FramedMessage(uuid.uuid4().int, header, signed)
187187
logger.debug(f'Sending {inner_envelope.message_id} to {inner_envelope.recipient} via {next_node_id}')
188188
if expected_response and inner_envelope.message_type == 'directive':
189189
self.response_registry[inner_envelope.message_id] = dict(message_sent_time=inner_envelope.timestamp)
190-
await self.forward(outer_envelope, next_node_id)
190+
await self.forward(msg, next_node_id)

0 commit comments

Comments
 (0)