Skip to content

Commit 4e85828

Browse files
committed
refactoring connection stuff
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 5123752 commit 4e85828

4 files changed

Lines changed: 111 additions & 124 deletions

File tree

receptor/connection.py

Lines changed: 1 addition & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,119 +1,8 @@
1-
import asyncio
2-
import json
3-
import logging
4-
5-
from . import exceptions
6-
from .messages import envelope, directive
7-
from .exceptions import ReceptorBufferError
8-
9-
logger = logging.getLogger(__name__)
10-
11-
RECEPTOR_DIRECTIVE_NAMESPACE = 'receptor'
12-
13-
141
class Connection:
15-
def __init__(self, id_, meta, protocol_obj, buffer_mgr, receptor):
2+
def __init__(self, id_, meta, protocol_obj):
163
self.id_ = id_
174
self.meta = meta
185
self.protocol_obj = protocol_obj
19-
self.buffer_mgr = buffer_mgr
20-
self.receptor = receptor
216

227
def __str__(self):
238
return f"<Connection {self.id_} {self.protocol_obj}>"
24-
25-
async def message_handler(self, buf):
26-
while True:
27-
for data in buf.get():
28-
if "cmd" in data and data["cmd"] == "ROUTE":
29-
self.handle_route_advertisement(data)
30-
else:
31-
await self.handle_message(data)
32-
await asyncio.sleep(.1)
33-
34-
def handle_route_advertisement(self, data):
35-
for edge in data["edges"]:
36-
existing_edge = self.receptor.router.find_edge(edge[0], edge[1])
37-
if existing_edge and existing_edge[2] > edge[2]:
38-
self.receptor.router.update_node(edge[0], edge[1], edge[2])
39-
else:
40-
self.receptor.router.register_edge(*edge)
41-
self.send_route_advertisement(data["edges"], data["seen"])
42-
43-
def send_route_advertisement(self, edges=None, seen=[]):
44-
edges = edges or self.receptor.router.get_edges()
45-
seen = set(seen)
46-
logger.debug("Emitting Route Advertisements, excluding {}".format(seen))
47-
destinations = set(self.receptor.connections) - seen
48-
seens = list(seen | destinations | {self.receptor.node_id})
49-
50-
# TODO: This should be a broadcast call to the connection manager
51-
for target in destinations:
52-
buf = self.buffer_mgr.get_buffer_for_node(target, self.receptor)
53-
try:
54-
buf.push(json.dumps({
55-
"cmd": "ROUTE",
56-
"id": self.receptor.node_id,
57-
"capabilities": self.receptor.work_manager.get_capabilities(),
58-
"groups": self.receptor.config.node_groups,
59-
"edges": edges,
60-
"seen": seens
61-
}).encode("utf-8"))
62-
except ReceptorBufferError as e:
63-
logger.exception("Receptor Buffer Write Error broadcasting routes and capabilities: {}".format(e))
64-
# TODO: This might should be a hard shutdown event
65-
except Exception as e:
66-
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
67-
68-
69-
async def handle_message(self, msg):
70-
outer_env = envelope.OuterEnvelope(**msg)
71-
next_hop = self.receptor.router.next_hop(outer_env.recipient)
72-
if next_hop is None:
73-
await outer_env.deserialize_inner(self.receptor)
74-
if outer_env.inner_obj.message_type == 'directive':
75-
try:
76-
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
77-
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
78-
await directive.control(self.receptor.router, outer_env.inner_obj)
79-
else:
80-
# other namespace/work directives
81-
await self.receptor.work_manager.handle(outer_env.inner_obj)
82-
except ValueError:
83-
logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (outer_env.inner_obj.directive,))
84-
err_resp = outer_env.inner_obj.make_response(
85-
receptor=self.receptor,
86-
recipient=outer_env.inner_obj.sender,
87-
payload="An invalid directive ('%s') was specified." % (outer_env.inner_obj.directive,),
88-
in_response_to=outer_env.inner_obj.message_id,
89-
serial=outer_env.inner_obj.serial + 1,
90-
ttl=15,
91-
code=1,
92-
)
93-
await self.receptor.router.send(err_resp)
94-
except Exception as e:
95-
logger.error("error in handle_message: '%s'. Sending failure response back." % (str(e),))
96-
err_resp = outer_env.inner_obj.make_response(
97-
receptor=self.receptor,
98-
recipient=outer_env.inner_obj.sender,
99-
payload=str(e),
100-
in_response_to=outer_env.inner_obj.message_id,
101-
serial=outer_env.inner_obj.serial + 1,
102-
ttl=15,
103-
code=1,
104-
)
105-
await self.receptor.router.send(err_resp)
106-
elif outer_env.inner_obj.message_type == 'response':
107-
in_response_to = outer_env.inner_obj.in_response_to
108-
if in_response_to in self.receptor.router.response_registry:
109-
logger.info(f'Handling response to {in_response_to} with callback.')
110-
for connection in self.receptor.controller_connections:
111-
connection.emit_response(outer_env.inner_obj)
112-
else:
113-
logger.warning(f'Received response to {in_response_to} but no record of sent message.')
114-
else:
115-
raise exceptions.UnknownMessageType(
116-
f'Unknown message type: {outer_env.inner_obj.message_type}')
117-
else:
118-
await self.receptor.router.forward(outer_env, next_hop)
119-

receptor/protocol.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class BaseProtocol(asyncio.Protocol):
3737
def __init__(self, receptor, loop):
3838
self.receptor = receptor
3939
self.loop = loop
40-
self.connection = None
4140

4241
async def watch_queue(self, node, transport):
4342
'''
@@ -73,8 +72,7 @@ def connection_made(self, transport):
7372
self.loop.create_task(self.wait_greeting())
7473

7574
def connection_lost(self, exc):
76-
if self.connection is not None:
77-
self.receptor.remove_connection(self.connection)
75+
self.receptor.remove_connection(self)
7876

7977
def data_received(self, data):
8078
logger.debug(data)
@@ -100,9 +98,9 @@ async def wait_greeting(self):
10098

10199
def handle_handshake(self, data):
102100
self.greeted = True
103-
self.connection = self.receptor.add_connection(data["id"], data.get("meta", {}), self)
101+
self.receptor.add_connection(data["id"], data.get("meta", {}), self)
104102
self.loop.create_task(self.watch_queue(data["id"], self.transport))
105-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer))
103+
self.loop.create_task(self.receptor.message_handler(self.incoming_buffer))
106104

107105
def send_handshake(self):
108106
msg = json.dumps({
@@ -125,7 +123,7 @@ def handle_handshake(self, data):
125123
super().handle_handshake(data)
126124
logger.debug("Received handshake from client with id %s, responding...", data["id"])
127125
self.send_handshake()
128-
self.connection.send_route_advertisement()
126+
self.receptor.send_route_advertisement()
129127

130128

131129
async def create_peer(receptor, loop, host, port):
@@ -155,7 +153,7 @@ def connection_lost(self, exc):
155153
def handle_handshake(self, data):
156154
super().handle_handshake(data)
157155
logger.debug("Received handshake from server with id %s", data["id"])
158-
self.connection.send_route_advertisement()
156+
self.receptor.send_route_advertisement()
159157

160158

161159
class BasicControllerProtocol(asyncio.Protocol):

receptor/receptor.py

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
from .router import MeshRouter
1010
from .work import WorkManager
1111
from .connection import Connection
12+
from .messages import envelope, directive
13+
from . import exceptions
1214

15+
RECEPTOR_DIRECTIVE_NAMESPACE = 'receptor'
1316
logger = logging.getLogger(__name__)
1417

1518

@@ -26,6 +29,7 @@ def __init__(self, config, node_id=None, router_cls=None,
2629
if not os.path.exists(self.base_path):
2730
os.makedirs(os.path.join(self.config.default_data_dir, self.node_id))
2831
self.connection_manifest_path = os.path.join(self.base_path, "connection_manifest")
32+
self.buffer_mgr = self.config.components_buffer_manager
2933
self.stop = False
3034

3135
def _find_node_id(self):
@@ -42,7 +46,7 @@ async def watch_expire(self):
4246
while True:
4347
current_manifest = self.get_connection_manifest()
4448
for connection in current_manifest:
45-
buffer = self.config.components_buffer_manager.get_buffer_for_node(connection["id"], self)
49+
buffer = self.buffer_mgr.get_buffer_for_node(connection["id"], self)
4650
for ident, message in buffer:
4751
message_actual = json.loads(message)
4852
if "expire_time" in message_actual and message_actual['expire_time'] < time.time():
@@ -84,7 +88,7 @@ def update_connection_manifest(self, connection):
8488
manifest.append(dict(id=connection,
8589
last=time.time()))
8690
self.write_connection_manifest(manifest)
87-
91+
8892
def update_connections(self, connection):
8993
self.router.register_edge(connection.id_, self.node_id, 1)
9094
if connection.id_ in self.connections:
@@ -93,11 +97,18 @@ def update_connections(self, connection):
9397
self.connections[connection.id_] = [connection]
9498
self.update_connection_manifest(connection.id_)
9599

100+
async def message_handler(self, buf):
101+
while True:
102+
for data in buf.get():
103+
if "cmd" in data and data["cmd"] == "ROUTE":
104+
self.handle_route_advertisement(data)
105+
else:
106+
await self.handle_message(data)
107+
await asyncio.sleep(.1)
108+
96109
def add_connection(self, id_, meta, protocol_obj):
97-
buffer_mgr = self.config.components_buffer_manager
98-
conn = Connection(id_, meta, protocol_obj, buffer_mgr, self)
110+
conn = Connection(id_, meta, protocol_obj)
99111
self.update_connections(conn)
100-
return conn
101112

102113
def remove_connection(self, protocol_obj):
103114
notify_connections = []
@@ -118,3 +129,84 @@ async def shutdown_handler(self):
118129
if self.stop:
119130
return
120131
await asyncio.sleep(1)
132+
133+
def handle_route_advertisement(self, data):
134+
self.router.add_edges(data["edges"])
135+
self.send_route_advertisement(data["edges"], data["seen"])
136+
137+
def send_route_advertisement(self, edges=None, seen=[]):
138+
edges = edges or self.router.get_edges()
139+
seen = set(seen)
140+
logger.debug("Emitting Route Advertisements, excluding {}".format(seen))
141+
destinations = set(self.connections) - seen
142+
seens = list(seen | destinations | {self.node_id})
143+
144+
# TODO: This should be a broadcast call to the connection manager
145+
for target in destinations:
146+
buf = self.buffer_mgr.get_buffer_for_node(target, self)
147+
try:
148+
buf.push(json.dumps({
149+
"cmd": "ROUTE",
150+
"id": self.node_id,
151+
"capabilities": self.work_manager.get_capabilities(),
152+
"groups": self.config.node_groups,
153+
"edges": edges,
154+
"seen": seens
155+
}).encode("utf-8"))
156+
except exceptions.ReceptorBufferError as e:
157+
logger.exception("Receptor Buffer Write Error broadcasting routes and capabilities: {}".format(e))
158+
# TODO: This might should be a hard shutdown event
159+
except Exception as e:
160+
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
161+
162+
async def handle_message(self, msg):
163+
outer_env = envelope.OuterEnvelope(**msg)
164+
next_hop = self.router.next_hop(outer_env.recipient)
165+
if next_hop:
166+
return await self.router.forward(outer_env, next_hop)
167+
168+
await outer_env.deserialize_inner(self)
169+
170+
if outer_env.inner_obj.message_type != 'directive':
171+
raise exceptions.UnknownMessageType(
172+
f'Unknown message type: {outer_env.inner_obj.message_type}')
173+
174+
try:
175+
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
176+
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
177+
await directive.control(self.router, outer_env.inner_obj)
178+
else:
179+
# other namespace/work directives
180+
await self.work_manager.handle(outer_env.inner_obj)
181+
except ValueError:
182+
logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (outer_env.inner_obj.directive,))
183+
err_resp = outer_env.inner_obj.make_response(
184+
receptor=self,
185+
recipient=outer_env.inner_obj.sender,
186+
payload="An invalid directive ('%s') was specified." % (outer_env.inner_obj.directive,),
187+
in_response_to=outer_env.inner_obj.message_id,
188+
serial=outer_env.inner_obj.serial + 1,
189+
ttl=15,
190+
code=1,
191+
)
192+
await self.router.send(err_resp)
193+
except Exception as e:
194+
logger.error("error in handle_message: '%s'. Sending failure response back." % (str(e),))
195+
err_resp = outer_env.inner_obj.make_response(
196+
receptor=self,
197+
recipient=outer_env.inner_obj.sender,
198+
payload=str(e),
199+
in_response_to=outer_env.inner_obj.message_id,
200+
serial=outer_env.inner_obj.serial + 1,
201+
ttl=15,
202+
code=1,
203+
)
204+
await self.router.send(err_resp)
205+
elif outer_env.inner_obj.message_type == 'response':
206+
in_response_to = outer_env.inner_obj.in_response_to
207+
if in_response_to in self.router.response_registry:
208+
logger.info(f'Handling response to {in_response_to} with callback.')
209+
for connection in self.controller_connections:
210+
connection.emit_response(outer_env.inner_obj)
211+
else:
212+
logger.warning(f'Received response to {in_response_to} but no record of sent message.')

receptor/router.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ def find_edge(self, left, right):
5252
return edge
5353
return None
5454

55+
def add_edges(self, edges):
56+
for edge in edges:
57+
existing_edge = self.find_edge(edge[0], edge[1])
58+
if existing_edge and existing_edge[2] > edge[2]:
59+
self.update_node(edge[0], edge[1], edge[2])
60+
else:
61+
self.register_edge(*edge)
62+
5563
def register_edge(self, left, right, cost):
5664
if left != self.node_id:
5765
self._nodes.add(left)

0 commit comments

Comments
 (0)