Skip to content

Commit 1019d9b

Browse files
committed
removing connection class
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 4e85828 commit 1019d9b

3 files changed

Lines changed: 45 additions & 39 deletions

File tree

receptor/connection.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

receptor/protocol.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,31 +37,36 @@ class BaseProtocol(asyncio.Protocol):
3737
def __init__(self, receptor, loop):
3838
self.receptor = receptor
3939
self.loop = loop
40+
self.id = None
41+
self.meta = None
4042

41-
async def watch_queue(self, node, transport):
43+
def __str__(self):
44+
return f"<Connection {self.id} {self.transport}"
45+
46+
async def watch_queue(self):
4247
'''
4348
Watches the buffer for this connection for messages delivered from other
4449
parts of Receptor (forwarded messages for example) for messages to send
4550
over the connection.
4651
'''
4752
buffer_mgr = self.receptor.config.components_buffer_manager
48-
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)
49-
while not transport.is_closing():
53+
buffer_obj = buffer_mgr.get_buffer_for_node(self.id, self.receptor)
54+
while not self.transport.is_closing():
5055
try:
5156
msg = buffer_obj.pop()
52-
transport.write(msg + DELIM)
57+
self.transport.write(msg + DELIM)
5358
except IndexError:
5459
await asyncio.sleep(0.1)
5560
except ReceptorBufferError as e:
5661
logger.exception("Receptor Buffer Read Error: {}".format(e))
5762
# TODO: We need to try to send this message along somewhere else
5863
# and record the failure somewhere
59-
transport.close()
64+
self.transport.close()
6065
return
6166
except Exception as e:
62-
logger.exception("Error received trying to write to {}: {}".format(node, e))
67+
logger.exception("Error received trying to write to {}: {}".format(self.id, e))
6368
buffer_obj.push(msg)
64-
transport.close()
69+
self.transport.close()
6570
return
6671

6772
def connection_made(self, transport):
@@ -98,8 +103,10 @@ async def wait_greeting(self):
98103

99104
def handle_handshake(self, data):
100105
self.greeted = True
101-
self.receptor.add_connection(data["id"], data.get("meta", {}), self)
102-
self.loop.create_task(self.watch_queue(data["id"], self.transport))
106+
self.id = data["id"]
107+
self.meta = data.get("meta", {})
108+
self.receptor.add_connection(self)
109+
self.loop.create_task(self.watch_queue())
103110
self.loop.create_task(self.receptor.message_handler(self.incoming_buffer))
104111

105112
def send_handshake(self):

receptor/receptor.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ def update_connection_manifest(self, connection):
8989
last=time.time()))
9090
self.write_connection_manifest(manifest)
9191

92-
def update_connections(self, connection):
93-
self.router.register_edge(connection.id_, self.node_id, 1)
94-
if connection.id_ in self.connections:
95-
self.connections[connection.id_].append(connection)
92+
def update_connections(self, protocol_obj):
93+
self.router.register_edge(protocol_obj.id, self.node_id, 1)
94+
if protocol_obj.id in self.connections:
95+
self.connections[protocol_obj.id].append(protocol_obj)
9696
else:
97-
self.connections[connection.id_] = [connection]
98-
self.update_connection_manifest(connection.id_)
97+
self.connections[protocol_obj.id] = [protocol_obj]
98+
self.update_connection_manifest(protocol_obj.id)
9999

100100
async def message_handler(self, buf):
101101
while True:
@@ -106,9 +106,8 @@ async def message_handler(self, buf):
106106
await self.handle_message(data)
107107
await asyncio.sleep(.1)
108108

109-
def add_connection(self, id_, meta, protocol_obj):
110-
conn = Connection(id_, meta, protocol_obj)
111-
self.update_connections(conn)
109+
def add_connection(self, protocol_obj):
110+
self.update_connections(protocol_obj)
112111

113112
def remove_connection(self, protocol_obj):
114113
notify_connections = []
@@ -159,18 +158,7 @@ def send_route_advertisement(self, edges=None, seen=[]):
159158
except Exception as e:
160159
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
161160

162-
async def handle_message(self, msg):
163-
outer_env = envelope.OuterEnvelope(**msg)
164-
next_hop = self.router.next_hop(outer_env.recipient)
165-
if next_hop:
166-
return await self.router.forward(outer_env, next_hop)
167-
168-
await outer_env.deserialize_inner(self)
169-
170-
if outer_env.inner_obj.message_type != 'directive':
171-
raise exceptions.UnknownMessageType(
172-
f'Unknown message type: {outer_env.inner_obj.message_type}')
173-
161+
async def handle_directive(self, outer_env):
174162
try:
175163
namespace, _ = outer_env.inner_obj.directive.split(':', 1)
176164
if namespace == RECEPTOR_DIRECTIVE_NAMESPACE:
@@ -202,11 +190,30 @@ async def handle_message(self, msg):
202190
code=1,
203191
)
204192
await self.router.send(err_resp)
205-
elif outer_env.inner_obj.message_type == 'response':
193+
194+
async def handle_response(self, outer_env):
206195
in_response_to = outer_env.inner_obj.in_response_to
207196
if in_response_to in self.router.response_registry:
208197
logger.info(f'Handling response to {in_response_to} with callback.')
209198
for connection in self.controller_connections:
210199
connection.emit_response(outer_env.inner_obj)
211200
else:
212201
logger.warning(f'Received response to {in_response_to} but no record of sent message.')
202+
203+
async def handle_message(self, msg):
204+
handlers = dict(
205+
directive=self.handle_directive,
206+
response=self.handle_response,
207+
)
208+
outer_env = envelope.OuterEnvelope(**msg)
209+
next_hop = self.router.next_hop(outer_env.recipient)
210+
if next_hop:
211+
return await self.router.forward(outer_env, next_hop)
212+
213+
await outer_env.deserialize_inner(self)
214+
215+
if outer_env.inner_obj.message_type not in handlers:
216+
raise exceptions.UnknownMessageType(
217+
f'Unknown message type: {outer_env.inner_obj.message_type}')
218+
219+
await handlers[outer_env.inner_obj.message_type](outer_env)

0 commit comments

Comments
 (0)