Skip to content

Commit 98009be

Browse files
authored
Merge pull request #64 from matburt/fix_socket_thrashing
Fix socket thrashing on disconnect
2 parents 33b8557 + d7b9196 commit 98009be

5 files changed

Lines changed: 10 additions & 4 deletions

File tree

receptor/buffers/file.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ async def expire(self):
9393
while self.q.qsize() > 0:
9494
ident = await self.q.get()
9595
data = await self._get_file(ident, handle_only=True, delete=False)
96+
# TODO: This will never work, it's not pure json anymore
9697
msg = json.load(data)
9798
if "expire_time" in msg and msg['expire_time'] < time.time():
9899
logger.info("Expiring message %s", ident)

receptor/connection/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ async def watch_queue(conn, buf):
2727
while not conn.closed:
2828
try:
2929
msg = await asyncio.wait_for(buf.get(), 5.0)
30+
if not msg:
31+
return await conn.close()
3032
except asyncio.TimeoutError:
3133
continue
3234
except Exception:
@@ -58,6 +60,8 @@ def start_receiving(self):
5860
async def receive(self):
5961
try:
6062
async for msg in self.conn:
63+
if self.conn.closed:
64+
break
6165
await self.buf.put(msg)
6266
except Exception:
6367
logger.exception("receive")

receptor/connection/sock.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@ def __init__(self, reader, writer, chunk_size=2 ** 8):
1414

1515
async def __anext__(self):
1616
bytes_ = await self.reader.read(self.chunk_size)
17+
if not bytes_:
18+
self.close()
1719
return bytes_
1820

1921
@property
2022
def closed(self):
2123
return self._closed
2224

23-
async def close(self):
25+
def close(self):
2426
self._closed = True
25-
await self.writer.close()
27+
self.writer.close()
2628

2729
async def send(self, bytes_):
2830
self.writer.write(bytes_)

receptor/entrypoints.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ def run_as_controller(config):
4545
logger.info(f'Starting stats on port {config.node_stats_port}')
4646
start_http_server(config.controller_stats_port)
4747
controller.enable_server(config.controller_listen)
48-
if config.controller_websocket_listen:
49-
controller.enable_websocket_server(config.controller_websocket_listen)
5048
controller.loop.create_task(controller.receptor.watch_expire())
5149
controller.run()
5250

receptor/receptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ async def message_handler(self, buf):
9191
data = await buf.get()
9292
except Exception:
9393
logger.exception("message_handler")
94+
break
9495
else:
9596
logger.debug("message_handler: %s", data)
9697
if "cmd" in data.header and data.header["cmd"] == "ROUTE":

0 commit comments

Comments
 (0)