Skip to content

Commit 7aa5f58

Browse files
authored
Merge pull request #38 from jhjaggars/connection-refactor
Connection refactor
2 parents 7dec995 + 14576d9 commit 7aa5f58

4 files changed

Lines changed: 141 additions & 150 deletions

File tree

receptor/connection.py

Lines changed: 0 additions & 119 deletions
This file was deleted.

receptor/protocol.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,32 +37,36 @@ class BaseProtocol(asyncio.Protocol):
3737
def __init__(self, receptor, loop):
3838
self.receptor = receptor
3939
self.loop = loop
40-
self.connection = None
40+
self.id = None
41+
self.meta = None
4142

42-
async def watch_queue(self, node, transport):
43+
def __str__(self):
44+
return f"<Connection {self.id} {self.transport}"
45+
46+
async def watch_queue(self):
4347
'''
4448
Watches the buffer for this connection for messages delivered from other
4549
parts of Receptor (forwarded messages for example) for messages to send
4650
over the connection.
4751
'''
4852
buffer_mgr = self.receptor.config.components_buffer_manager
49-
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)
50-
while not transport.is_closing():
53+
buffer_obj = buffer_mgr.get_buffer_for_node(self.id, self.receptor)
54+
while not self.transport.is_closing():
5155
try:
5256
msg = buffer_obj.pop()
53-
transport.write(msg + DELIM)
57+
self.transport.write(msg + DELIM)
5458
except IndexError:
5559
await asyncio.sleep(0.1)
5660
except ReceptorBufferError as e:
5761
logger.exception("Receptor Buffer Read Error: {}".format(e))
5862
# TODO: We need to try to send this message along somewhere else
5963
# and record the failure somewhere
60-
transport.close()
64+
self.transport.close()
6165
return
6266
except Exception as e:
63-
logger.exception("Error received trying to write to {}: {}".format(node, e))
67+
logger.exception("Error received trying to write to {}: {}".format(self.id, e))
6468
buffer_obj.push(msg)
65-
transport.close()
69+
self.transport.close()
6670
return
6771

6872
def connection_made(self, transport):
@@ -73,8 +77,7 @@ def connection_made(self, transport):
7377
self.loop.create_task(self.wait_greeting())
7478

7579
def connection_lost(self, exc):
76-
if self.connection is not None:
77-
self.receptor.remove_connection(self.connection)
80+
self.receptor.remove_connection(self)
7881

7982
def data_received(self, data):
8083
logger.debug(data)
@@ -100,9 +103,11 @@ async def wait_greeting(self):
100103

101104
def handle_handshake(self, data):
102105
self.greeted = True
103-
self.connection = self.receptor.add_connection(data["id"], data.get("meta", {}), self)
104-
self.loop.create_task(self.watch_queue(data["id"], self.transport))
105-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer))
106+
self.id = data["id"]
107+
self.meta = data.get("meta", {})
108+
self.receptor.add_connection(self)
109+
self.loop.create_task(self.watch_queue())
110+
self.loop.create_task(self.receptor.message_handler(self.incoming_buffer))
106111

107112
def send_handshake(self):
108113
msg = json.dumps({
@@ -125,7 +130,7 @@ def handle_handshake(self, data):
125130
super().handle_handshake(data)
126131
logger.debug("Received handshake from client with id %s, responding...", data["id"])
127132
self.send_handshake()
128-
self.connection.send_route_advertisement()
133+
self.receptor.send_route_advertisement()
129134

130135

131136
async def create_peer(receptor, loop, host, port):
@@ -155,7 +160,7 @@ def connection_lost(self, exc):
155160
def handle_handshake(self, data):
156161
super().handle_handshake(data)
157162
logger.debug("Received handshake from server with id %s", data["id"])
158-
self.connection.send_route_advertisement()
163+
self.receptor.send_route_advertisement()
159164

160165

161166
class BasicControllerProtocol(asyncio.Protocol):

receptor/receptor.py

Lines changed: 113 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88

99
from .router import MeshRouter
1010
from .work import WorkManager
11-
from .connection import Connection
11+
from .messages import envelope, directive
12+
from . import exceptions
1213

14+
RECEPTOR_DIRECTIVE_NAMESPACE = 'receptor'
1315
logger = logging.getLogger(__name__)
1416

1517

@@ -26,6 +28,7 @@ def __init__(self, config, node_id=None, router_cls=None,
2628
if not os.path.exists(self.base_path):
2729
os.makedirs(os.path.join(self.config.default_data_dir, self.node_id))
2830
self.connection_manifest_path = os.path.join(self.base_path, "connection_manifest")
31+
self.buffer_mgr = self.config.components_buffer_manager
2932
self.stop = False
3033

3134
def _find_node_id(self):
@@ -42,7 +45,7 @@ async def watch_expire(self):
4245
while True:
4346
current_manifest = self.get_connection_manifest()
4447
for connection in current_manifest:
45-
buffer = self.config.components_buffer_manager.get_buffer_for_node(connection["id"], self)
48+
buffer = self.buffer_mgr.get_buffer_for_node(connection["id"], self)
4649
for ident, message in buffer:
4750
message_actual = json.loads(message)
4851
if "expire_time" in message_actual and message_actual['expire_time'] < time.time():
@@ -84,20 +87,26 @@ def update_connection_manifest(self, connection):
8487
manifest.append(dict(id=connection,
8588
last=time.time()))
8689
self.write_connection_manifest(manifest)
87-
88-
def update_connections(self, connection):
89-
self.router.register_edge(connection.id_, self.node_id, 1)
90-
if connection.id_ in self.connections:
91-
self.connections[connection.id_].append(connection)
90+
91+
def update_connections(self, protocol_obj):
92+
self.router.register_edge(protocol_obj.id, self.node_id, 1)
93+
if protocol_obj.id in self.connections:
94+
self.connections[protocol_obj.id].append(protocol_obj)
9295
else:
93-
self.connections[connection.id_] = [connection]
94-
self.update_connection_manifest(connection.id_)
96+
self.connections[protocol_obj.id] = [protocol_obj]
97+
self.update_connection_manifest(protocol_obj.id)
98+
99+
async def message_handler(self, buf):
100+
while True:
101+
for data in buf.get():
102+
if "cmd" in data and data["cmd"] == "ROUTE":
103+
self.handle_route_advertisement(data)
104+
else:
105+
await self.handle_message(data)
106+
await asyncio.sleep(.1)
95107

96-
def add_connection(self, id_, meta, protocol_obj):
97-
buffer_mgr = self.config.components_buffer_manager
98-
conn = Connection(id_, meta, protocol_obj, buffer_mgr, self)
99-
self.update_connections(conn)
100-
return conn
108+
def add_connection(self, protocol_obj):
109+
self.update_connections(protocol_obj)
101110

102111
def remove_connection(self, protocol_obj):
103112
notify_connections = []
@@ -110,11 +119,99 @@ def remove_connection(self, protocol_obj):
110119
self.router.debug_router()
111120
self.update_connection_manifest(connection_node)
112121
notify_connections += self.connections[connection_node]
113-
for active_connection in notify_connections:
114-
active_connection.send_route_advertisement(self.router.get_edges())
122+
self.send_route_advertisement(self.router.get_edges())
115123

116124
async def shutdown_handler(self):
117125
while True:
118126
if self.stop:
119127
return
120128
await asyncio.sleep(1)
129+
130+
def handle_route_advertisement(self, data):
131+
self.router.add_edges(data["edges"])
132+
self.send_route_advertisement(data["edges"], data["seen"])
133+
134+
def send_route_advertisement(self, edges=None, seen=[]):
135+
edges = edges or self.router.get_edges()
136+
seen = set(seen)
137+
logger.debug("Emitting Route Advertisements, excluding {}".format(seen))
138+
destinations = set(self.connections) - seen
139+
seens = list(seen | destinations | {self.node_id})
140+
141+
# TODO: This should be a broadcast call to the connection manager
142+
for target in destinations:
143+
buf = self.buffer_mgr.get_buffer_for_node(target, self)
144+
try:
145+
buf.push(json.dumps({
146+
"cmd": "ROUTE",
147+
"id": self.node_id,
148+
"capabilities": self.work_manager.get_capabilities(),
149+
"groups": self.config.node_groups,
150+
"edges": edges,
151+
"seen": seens
152+
}).encode("utf-8"))
153+
except exceptions.ReceptorBufferError as e:
154+
logger.exception("Receptor Buffer Write Error broadcasting routes and capabilities: {}".format(e))
155+
# TODO: This might should be a hard shutdown event
156+
except Exception as e:
157+
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
158+
159+
async def handle_directive(self, outer_env):
160+
try:
161+
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
162+
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
163+
await directive.control(self.router, outer_env.inner_obj)
164+
else:
165+
# other namespace/work directives
166+
await self.work_manager.handle(outer_env.inner_obj)
167+
except ValueError:
168+
logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (outer_env.inner_obj.directive,))
169+
err_resp = outer_env.inner_obj.make_response(
170+
receptor=self,
171+
recipient=outer_env.inner_obj.sender,
172+
payload="An invalid directive ('%s') was specified." % (outer_env.inner_obj.directive,),
173+
in_response_to=outer_env.inner_obj.message_id,
174+
serial=outer_env.inner_obj.serial + 1,
175+
ttl=15,
176+
code=1,
177+
)
178+
await self.router.send(err_resp)
179+
except Exception as e:
180+
logger.error("error in handle_message: '%s'. Sending failure response back." % (str(e),))
181+
err_resp = outer_env.inner_obj.make_response(
182+
receptor=self,
183+
recipient=outer_env.inner_obj.sender,
184+
payload=str(e),
185+
in_response_to=outer_env.inner_obj.message_id,
186+
serial=outer_env.inner_obj.serial + 1,
187+
ttl=15,
188+
code=1,
189+
)
190+
await self.router.send(err_resp)
191+
192+
async def handle_response(self, outer_env):
193+
in_response_to = outer_env.inner_obj.in_response_to
194+
if in_response_to in self.router.response_registry:
195+
logger.info(f'Handling response to {in_response_to} with callback.')
196+
for connection in self.controller_connections:
197+
connection.emit_response(outer_env.inner_obj)
198+
else:
199+
logger.warning(f'Received response to {in_response_to} but no record of sent message.')
200+
201+
async def handle_message(self, msg):
202+
handlers = dict(
203+
directive=self.handle_directive,
204+
response=self.handle_response,
205+
)
206+
outer_env = envelope.OuterEnvelope(**msg)
207+
next_hop = self.router.next_hop(outer_env.recipient)
208+
if next_hop:
209+
return await self.router.forward(outer_env, next_hop)
210+
211+
await outer_env.deserialize_inner(self)
212+
213+
if outer_env.inner_obj.message_type not in handlers:
214+
raise exceptions.UnknownMessageType(
215+
f'Unknown message type: {outer_env.inner_obj.message_type}')
216+
217+
await handlers[outer_env.inner_obj.message_type](outer_env)

receptor/router.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ def find_edge(self, left, right):
5252
return edge
5353
return None
5454

55+
def add_edges(self, edges):
56+
for edge in edges:
57+
existing_edge = self.find_edge(edge[0], edge[1])
58+
if existing_edge and existing_edge[2] > edge[2]:
59+
self.update_node(edge[0], edge[1], edge[2])
60+
else:
61+
self.register_edge(*edge)
62+
5563
def register_edge(self, left, right, cost):
5664
if left != self.node_id:
5765
self._nodes.add(left)

0 commit comments

Comments
 (0)