File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1- import collections
21import logging
32import uuid
43import json
Original file line number Diff line number Diff line change 11import datetime
22import asyncio
33import logging
4+ import time
45import json
56import uuid
67from collections import deque
@@ -88,8 +89,7 @@ async def wait_greeting(self):
8889 break
8990 else :
9091 logger .error ("Handshake failed!" )
91- # TODO: Trigger disconnection
92- # otherwise we could get stuck here
92+ self .transport .close ()
9393 await asyncio .sleep (.1 )
9494 logger .debug ("handshake complete, starting normal handle loop" )
9595 self .loop .create_task (self .connection .message_handler (self .incoming_buffer )) # Duplicated (see handle_handshake)?
@@ -104,6 +104,7 @@ def send_handshake(self):
104104 msg = json .dumps ({
105105 "cmd" : "HI" ,
106106 "id" : self .receptor .node_id ,
107+ "expire_time" : time .time () + 10 ,
107108 }).encode ("utf-8" )
108109 self .transport .write (msg + DELIM )
109110
@@ -185,7 +186,6 @@ def data_received(self, data):
185186 raw_payload = payload ,
186187 directive = directive
187188 )
188- # TODO: Response expiration task?
189189 # TODO: Persistent registry?
190190 self .loop .create_task (self .receptor .router .send (inner_env ,
191191 expected_response = True ))
Original file line number Diff line number Diff line change @@ -46,8 +46,8 @@ async def watch_expire(self):
4646 for ident , message in buffer :
4747 message_actual = json .loads (message )
4848 if "expire_time" in message_actual and message_actual ['expire_time' ] < time .time ():
49+ buffer .read_message (ident , remove = True )
4950 logger .info ("Expiring message {}:{}" .format (ident , connection ["id" ]))
50- expired_message = buffer .read_message (ident , remove = True )
5151 # TODO: Do something with expired message
5252 if connection ["last" ] + 86400 < time .time ():
5353 logger .info ("Expiring connection {}" .format (connection ["id" ]))
@@ -110,7 +110,6 @@ def remove_connection(self, protocol_obj):
110110 self .router .debug_router ()
111111 self .update_connection_manifest (connection_node )
112112 notify_connections += self .connections [connection_node ]
113- # TODO: Broadcast update, set timer for full expiration
114113 for active_connection in notify_connections :
115114 active_connection .send_route_advertisement (self .router .get_edges ())
116115
You can’t perform that action at this time.
0 commit comments