Skip to content

Commit cc31c85

Browse files
committed
storing expire time as metadata rather than part of the payload
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent fea543e commit cc31c85

2 files changed

Lines changed: 18 additions & 15 deletions

File tree

receptor/buffers/file.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import asyncio
2+
import datetime
23
import json
34
import logging
45
import os
5-
import time
66
import uuid
77
from concurrent.futures import ThreadPoolExecutor
88

@@ -29,9 +29,12 @@ def __init__(self, dir_, key, loop):
2929
self.q.put_nowait(item)
3030

3131
async def put(self, data):
32-
ident = str(uuid.uuid4())
33-
await self._loop.run_in_executor(pool, self._write_file, data, ident)
34-
await self.q.put(ident)
32+
item = {
33+
"ident": str(uuid.uuid4()),
34+
"expire_time": datetime.datetime.now() + datetime.timedelta(minutes=5),
35+
}
36+
await self._loop.run_in_executor(pool, self._write_file, data, item)
37+
await self.q.put(item)
3538
await self._save_manifest()
3639

3740
async def get(self, handle_only=False, delete=True):
@@ -77,30 +80,29 @@ async def _get_file(self, ident, handle_only=False, delete=True):
7780
fp = await self._loop.run_in_executor(pool, open, path, "rb")
7881
if handle_only:
7982
return fp
80-
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
83+
bytes_ = await self._loop.run_in_executor(pool, fp.read)
8184
fp.close()
8285
if delete:
8386
await self._loop.run_in_executor(pool, os.remove, path)
84-
return bytes
87+
return bytes_
8588

86-
def _write_file(self, data, ident):
87-
with open(os.path.join(self._message_path, ident), "wb") as fp:
89+
def _write_file(self, data, item):
90+
with open(os.path.join(self._message_path, item["ident"]), "wb") as fp:
8891
fp.write(data)
8992

9093
async def expire(self):
9194
async with self._manifest_lock:
9295
new_queue = asyncio.Queue()
9396
while self.q.qsize() > 0:
94-
ident = await self.q.get()
95-
data = await self._get_file(ident, handle_only=True, delete=False)
96-
# TODO: This will never work, it's not pure json anymore
97-
msg = json.load(data)
98-
if "expire_time" in msg and msg['expire_time'] < time.time():
97+
item = await self.q.get()
98+
ident = item["ident"]
99+
expire_time = item["expire_time"]
100+
if expire_time > datetime.datetime.now():
99101
logger.info("Expiring message %s", ident)
100102
# TODO: Do something with expired message
101103
await self._loop.run_in_executor(pool, os.remove, self._path_for_ident(ident))
102104
else:
103-
await new_queue.put(ident)
105+
await new_queue.put(item)
104106
self.q = new_queue
105107

106108

receptor/receptor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import pkg_resources
21
import asyncio
32
import copy
43
import json
@@ -7,6 +6,8 @@
76
import time
87
import uuid
98

9+
import pkg_resources
10+
1011
from . import exceptions
1112
from .messages import directive, envelope
1213
from .router import MeshRouter

0 commit comments

Comments
 (0)