@@ -39,6 +39,7 @@ def __init__(self, receptor, loop):
3939 self .write_task = None
4040
4141 def start_receiving (self , ws ):
42+ logger .debug ("starting recv" )
4243 self .read_task = self .loop .create_task (self .receive (ws ))
4344
4445 async def receive (self , ws ):
@@ -62,33 +63,35 @@ def _cancel(self, task):
6263 task .cancel ()
6364
6465 async def hello (self , ws ):
66+ logger .debug ("sending HI" )
6567 msg = self .receptor ._say_hi ().serialize ()
6668 await ws .send_bytes (msg )
6769
6870 async def start_processing (self , ws ):
71+ logger .debug ("sending routes" )
72+ await self .receptor .send_route_advertisement ()
73+ logger .debug ("starting normal loop" )
6974 self .handle_task = self .loop .create_task (self .receptor .message_handler (self .buf ))
7075 out = self .receptor .buffer_mgr .get_buffer_for_node (
7176 self .remote_id , self .receptor
7277 )
7378 self .write_task = self .loop .create_task (watch_queue (ws , out ))
7479 return await self .write_task
7580
81+ async def _wait_handshake (self , ws ):
82+ logger .debug ("serve: waiting for HI" )
83+ response = await self .buf .get () # TODO: deal with timeout
84+ self .remote_id = response .header ["id" ]
85+ self .register (ws )
86+
7687
7788class WSClient (WSBase ):
7889 async def connect (self , uri ):
7990 async with aiohttp .ClientSession ().ws_connect (uri ) as ws :
8091 try :
81- logger .debug ("connect: starting recv" )
8292 self .start_receiving (ws )
83- logger .debug ("connect: sending HI" )
8493 await self .hello (ws )
85- logger .debug ("connect: waiting for HI" )
86- response = await self .buf .get () # TODO: deal with timeout
87- self .remote_id = response .header ["id" ]
88- self .register (ws )
89- logger .debug ("connect: sending routes" )
90- await self .receptor .send_route_advertisement ()
91- logger .debug ("connect: starting normal loop" )
94+ await self ._wait_handshake (ws )
9295 await self .start_processing (ws )
9396 logger .debug ("connect: normal exit" )
9497 except Exception :
@@ -106,19 +109,13 @@ async def serve(self, request):
106109 ws = aiohttp .web .WebSocketResponse ()
107110 await ws .prepare (request )
108111
109- logger .debug ("serve: starting recv" )
110- self .loop .create_task (self .receive (ws )) # reader
111- logger .debug ("serve: waiting for HI" )
112- response = await self .buf .get () # TODO: deal with timeout
113- self .remote_id = response .header ["id" ]
114- self .register (ws )
115- logger .debug ("serve: sending HI" )
116- await self .hello (ws )
117- logger .debug ("serve: sending routes" )
118- await self .receptor .send_route_advertisement ()
119- logger .debug ("serve: starting normal recv loop" )
120- await self .start_processing (ws )
121- self .unregister (ws )
112+ try :
113+ self .start_receiving (ws )
114+ await self ._wait_handshake (ws )
115+ await self .hello (ws )
116+ await self .start_processing (ws )
117+ finally :
118+ self .unregister (ws )
122119
123120 def app (self ):
124121 app = aiohttp .web .Application ()
0 commit comments