Skip to content

Commit d7b9196

Browse files
committed
Fix socket thrashing on disconnect
The underlying transport doesn't seem to realize the connection has gone away underneath it. This detects the scenario and closes the connection.
1 parent 33b8557 commit d7b9196

5 files changed

Lines changed: 10 additions & 4 deletions

File tree

receptor/buffers/file.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ async def expire(self):
9393
while self.q.qsize() > 0:
9494
ident = await self.q.get()
9595
data = await self._get_file(ident, handle_only=True, delete=False)
96+
# TODO: This will never work, it's not pure json anymore
9697
msg = json.load(data)
9798
if "expire_time" in msg and msg['expire_time'] < time.time():
9899
logger.info("Expiring message %s", ident)

receptor/connection/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ async def watch_queue(conn, buf):
2727
while not conn.closed:
2828
try:
2929
msg = await asyncio.wait_for(buf.get(), 5.0)
30+
if not msg:
31+
return await conn.close()
3032
except asyncio.TimeoutError:
3133
continue
3234
except Exception:
@@ -58,6 +60,8 @@ def start_receiving(self):
5860
async def receive(self):
5961
try:
6062
async for msg in self.conn:
63+
if self.conn.closed:
64+
break
6165
await self.buf.put(msg)
6266
except Exception:
6367
logger.exception("receive")

receptor/connection/sock.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@ def __init__(self, reader, writer, chunk_size=2 ** 8):
1414

1515
async def __anext__(self):
1616
bytes_ = await self.reader.read(self.chunk_size)
17+
if not bytes_:
18+
self.close()
1719
return bytes_
1820

1921
@property
2022
def closed(self):
2123
return self._closed
2224

23-
async def close(self):
25+
def close(self):
2426
self._closed = True
25-
await self.writer.close()
27+
self.writer.close()
2628

2729
async def send(self, bytes_):
2830
self.writer.write(bytes_)

receptor/entrypoints.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ def run_as_controller(config):
4545
logger.info(f'Starting stats on port {config.node_stats_port}')
4646
start_http_server(config.controller_stats_port)
4747
controller.enable_server(config.controller_listen)
48-
if config.controller_websocket_listen:
49-
controller.enable_websocket_server(config.controller_websocket_listen)
5048
controller.loop.create_task(controller.receptor.watch_expire())
5149
controller.run()
5250

receptor/receptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ async def message_handler(self, buf):
9191
data = await buf.get()
9292
except Exception:
9393
logger.exception("message_handler")
94+
break
9495
else:
9596
logger.debug("message_handler: %s", data)
9697
if "cmd" in data.header and data.header["cmd"] == "ROUTE":

0 commit comments

Comments
 (0)