Skip to content

Commit 23208bb

Browse files
committed
Fixing up some bugs with disconnection handling
1 parent 34e5127 commit 23208bb

3 files changed

Lines changed: 20 additions & 11 deletions

File tree

receptor/controller.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def mainloop(receptor, socket_path, loop=asyncio.get_event_loop()):
3535
)
3636
logger.info(f'Opening control socket on {socket_path}')
3737
loop.create_task(control_listener)
38+
loop.create_task(receptor.watch_expire())
3839
try:
3940
loop.run_forever()
4041
except KeyboardInterrupt:

receptor/protocol.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ def __init__(self, receptor, loop):
3535
self.loop = loop
3636

3737
async def watch_queue(self, node, transport):
38+
'''
39+
Watches the buffer for this connection for messages delivered from other
40+
parts of Receptor (forwarded messages for example) for messages to send
41+
over the connection.
42+
'''
3843
buffer_mgr = self.receptor.config.components.buffer_manager
3944
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)
4045
while not transport.is_closing():
@@ -63,13 +68,17 @@ def connection_made(self, transport):
6368
self.loop.create_task(self.wait_greeting())
6469

6570
def connection_lost(self, exc):
66-
self.receptor.remove_connection(self)
71+
self.receptor.remove_connection(self.connection)
6772

6873
def data_received(self, data):
6974
logger.debug(data)
7075
self.incoming_buffer.add(data)
7176

7277
async def wait_greeting(self):
78+
'''
79+
Initialized when the connection is established to handle the greeting
80+
before transitioning to message processing.
81+
'''
7382
while not self.greeted:
7483
logger.debug('Looking for handshake...')
7584
for data in self.incoming_buffer.get():
@@ -80,9 +89,10 @@ async def wait_greeting(self):
8089
else:
8190
logger.error("Handshake failed!")
8291
# TODO: Trigger disconnection
92+
# otherwise we could get stuck here
8393
await asyncio.sleep(.1)
8494
logger.debug("handshake complete, starting normal handle loop")
85-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated?
95+
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated (see handle_handshake)?
8696

8797
def handle_handshake(self, data):
8898
self.greeted = True

receptor/receptor.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ def _find_node_id(self):
4040

4141
async def watch_expire(self):
4242
while True:
43-
logger.info("Checking expirations")
4443
current_manifest = self.get_connection_manifest()
4544
for connection in current_manifest:
4645
buffer = self.config.components.buffer_manager.get_buffer_for_node(connection["id"], self)
4746
for ident, message in buffer:
4847
message_actual = json.loads(message)
49-
logger.info("Examining {}".format(message_data))
50-
if "expire_time" in message_data and message_data['expire_time'] < time.time():
48+
if "expire_time" in message_actual and message_actual['expire_time'] < time.time():
5149
logger.info("Expiring message {}:{}".format(ident, connection["id"]))
5250
expired_message = buffer.read_message(ident, remove=True)
5351
# TODO: Do something with expired message
@@ -83,7 +81,7 @@ def update_connection_manifest(self, connection):
8381
found = True
8482
break
8583
if not found:
86-
manifest.append(dict(id=connection.id_,
84+
manifest.append(dict(id=connection,
8785
last=time.time()))
8886
self.write_connection_manifest(manifest)
8987

@@ -102,19 +100,19 @@ def add_connection(self, id_, protocol_obj):
102100
return conn
103101

104102
def remove_connection(self, protocol_obj):
105-
notify_protocols = []
106-
#self.update_connection_manifest(conn)
103+
notify_connections = []
107104
for connection_node in self.connections:
108105
if protocol_obj in self.connections[connection_node]:
109106
logger.info("Removing connection {} for node {}".format(protocol_obj, connection_node))
110107
self.update_connection_manifest(connection_node)
111108
self.connections[connection_node].remove(protocol_obj)
112109
self.router.update_node(self.node_id, connection_node, 100)
113110
self.router.debug_router()
114-
notify_protocols += self.connections[connection_node]
111+
self.update_connection_manifest(connection_node)
112+
notify_connections += self.connections[connection_node]
115113
# TODO: Broadcast update, set timer for full expiration
116-
for active_protocol in notify_protocols:
117-
active_protocol.send_route_advertisement(self.router.get_edges())
114+
for active_connection in notify_connections:
115+
active_connection.send_route_advertisement(self.router.get_edges())
118116

119117
async def shutdown_handler(self):
120118
while True:

0 commit comments

Comments
 (0)