1- import logging
2-
31import asyncio
4- from collections . abc import AsyncIterator
2+ import logging
53from abc import abstractmethod , abstractproperty
4+ from collections .abc import AsyncIterator
65
76from ..messages .envelope import FramedBuffer
87
@@ -24,23 +23,27 @@ async def send(self, bytes_):
2423
2524
2625async def watch_queue (conn , buf ):
27- while not conn .closed :
28- try :
29- msg = await asyncio .wait_for (buf .get (), 5.0 )
30- if not msg :
26+ try :
27+ while not conn .closed :
28+ try :
29+ msg = await asyncio .wait_for (buf .get (), 5.0 )
30+ if not msg :
31+ return await conn .close ()
32+ except asyncio .TimeoutError :
33+ continue
34+ except Exception :
35+ logger .exception ("watch_queue: error getting data from buffer" )
36+ continue
37+
38+ try :
39+ await conn .send (msg )
40+ except Exception :
41+ logger .exception ("watch_queue: error received trying to write" )
42+ await buf .put (msg )
3143 return await conn .close ()
32- except asyncio .TimeoutError :
33- continue
34- except Exception :
35- logger .exception ("watch_queue: error getting data from buffer" )
36- continue
37-
38- try :
39- await conn .send (msg )
40- except Exception :
41- logger .exception ("watch_queue: error received trying to write" )
42- await buf .put (msg )
43- return await conn .close ()
44+ except asyncio .CancelledError :
45+ logger .debug ("watch_queue: cancel request received" )
46+ await conn .close ()
4447
4548
4649class Worker :
@@ -63,6 +66,8 @@ async def receive(self):
6366 if self .conn .closed :
6467 break
6568 await self .buf .put (msg )
69+ except asyncio .CancelledError :
70+ logger .debug ("receive: cancel request received" )
6671 except Exception :
6772 logger .exception ("receive" )
6873
0 commit comments