Skip to content

Commit 4eb0b77

Browse files
committed
adding an expire method to the buffer
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent 525762d commit 4eb0b77

2 files changed

Lines changed: 26 additions & 9 deletions

File tree

receptor/buffers/file.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
import logging
44
import os
5+
import time
56
import uuid
67
from collections import deque
78
from concurrent.futures import ThreadPoolExecutor
@@ -61,27 +62,48 @@ def _read_manifest(self):
6162
except FileNotFoundError:
6263
return []
6364

64-
async def _get_file(self, path, handle_only=False, delete=True):
65+
def _path_for_ident(self, ident):
66+
return os.path.join(self._message_path, ident)
67+
68+
async def _get_file(self, ident, handle_only=False, delete=True):
6569
"""
6670
Retrieves a file from disk. If handle_only is True then we will
6771
return the handle to the file and do nothing else. Otherwise the file
6872
is read into memory all at once and returned. If delete is True (the
6973
default) and handle_only is False (the default) then the underlying
7074
file will be removed as well.
7175
"""
72-
path = os.path.join(self._message_path, path)
76+
path = self._path_for_ident(ident)
7377
fp = await self._loop.run_in_executor(pool, open, path, "rb")
7478
if handle_only:
7579
return fp
7680
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
7781
fp.close()
7882
if delete:
79-
os.remove(path)
83+
await self._loop.run_in_executor(pool, os.remove, path)
8084
return bytes
8185

8286
def _write_file(self, data, ident):
8387
with open(os.path.join(self._message_path, ident), "wb") as fp:
8488
fp.write(data)
89+
90+
async def expire(self):
91+
async with self._manifest_lock:
92+
new_queue = asyncio.Queue()
93+
while self.q.qsize() > 0:
94+
ident = await self.q.get()
95+
data = await self._get_file(ident, handle_only=True, delete=False)
96+
msg = json.load(data)
97+
if "expire_time" in msg and msg['expire_time'] < time.time():
98+
logger.info("Expiring message %s", ident)
99+
# TODO: Do something with expired message
100+
await self._loop.run_in_executor(pool, os.remove, self._path_for_ident(ident))
101+
else:
102+
await new_queue.put(ident)
103+
self.q = new_queue
104+
105+
106+
85107

86108

87109
class FileBufferManager(BaseBufferManager):

receptor/receptor.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,7 @@ async def watch_expire(self):
4747
current_manifest = self.get_connection_manifest()
4848
for connection in current_manifest:
4949
buffer = self.buffer_mgr.get_buffer_for_node(connection["id"], self)
50-
for ident, message in buffer:
51-
message_actual = json.loads(message)
52-
if "expire_time" in message_actual and message_actual['expire_time'] < time.time():
53-
buffer.read_message(ident, remove=True)
54-
logger.info("Expiring message {}:{}".format(ident, connection["id"]))
55-
# TODO: Do something with expired message
50+
await buffer.expire()
5651
if connection["last"] + 86400 < time.time():
5752
logger.info("Expiring connection {}".format(connection["id"]))
5853
write_manifest = copy.copy(current_manifest)

0 commit comments

Comments
 (0)