Skip to content

Commit 661bd99

Browse files
authored
Merge pull request #71 from jhjaggars/fix-buffer-expire
storing expire time as metadata rather than part of the payload
2 parents fea543e + 23b01ea commit 661bd99

2 files changed

Lines changed: 44 additions & 18 deletions

File tree

receptor/buffers/file.py

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import asyncio
2+
import datetime
23
import json
34
import logging
45
import os
5-
import time
66
import uuid
77
from concurrent.futures import ThreadPoolExecutor
88

@@ -12,6 +12,29 @@
1212
pool = ThreadPoolExecutor()
1313

1414

15+
class ManifestEncoder(json.JSONEncoder):
16+
def default(self, o):
17+
if isinstance(o, datetime.datetime):
18+
return {
19+
"_type": "datetime.datetime",
20+
"value": o.isoformat(),
21+
}
22+
return super().default(o)
23+
24+
25+
class ManifestDecoder(json.JSONDecoder):
26+
def __init__(self, *args, **kwargs):
27+
super().__init__(object_hook=self.object_hook, *args, **kwargs)
28+
29+
def object_hook(self, o):
30+
type_ = o.get("_type")
31+
if type_ != "datetime.datetime":
32+
return o
33+
34+
t = o['_type']
35+
return datetime.datetime.fromisoformat(o["value"])
36+
37+
1538
class DurableBuffer:
1639

1740
def __init__(self, dir_, key, loop):
@@ -29,17 +52,20 @@ def __init__(self, dir_, key, loop):
2952
self.q.put_nowait(item)
3053

3154
async def put(self, data):
32-
ident = str(uuid.uuid4())
33-
await self._loop.run_in_executor(pool, self._write_file, data, ident)
34-
await self.q.put(ident)
55+
item = {
56+
"ident": str(uuid.uuid4()),
57+
"expire_time": datetime.datetime.utcnow() + datetime.timedelta(minutes=5),
58+
}
59+
await self._loop.run_in_executor(pool, self._write_file, data, item)
60+
await self.q.put(item)
3561
await self._save_manifest()
3662

3763
async def get(self, handle_only=False, delete=True):
3864
while True:
3965
msg = await self.q.get()
4066
await self._save_manifest()
4167
try:
42-
return await self._get_file(msg, handle_only=handle_only, delete=delete)
68+
return await self._get_file(msg["ident"], handle_only=handle_only, delete=delete)
4369
except FileNotFoundError:
4470
pass
4571

@@ -49,12 +75,12 @@ async def _save_manifest(self):
4975

5076
def _write_manifest(self):
5177
with open(self._manifest_path, "w") as fp:
52-
json.dump(list(self.q._queue), fp)
78+
json.dump(list(self.q._queue), fp, cls=ManifestEncoder)
5379

5480
def _read_manifest(self):
5581
try:
5682
with open(self._manifest_path, "r") as fp:
57-
return json.load(fp)
83+
return json.load(fp, cls=ManifestDecoder)
5884
except FileNotFoundError:
5985
return []
6086
except json.decoder.JSONDecodeError:
@@ -77,30 +103,29 @@ async def _get_file(self, ident, handle_only=False, delete=True):
77103
fp = await self._loop.run_in_executor(pool, open, path, "rb")
78104
if handle_only:
79105
return fp
80-
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
106+
bytes_ = await self._loop.run_in_executor(pool, fp.read)
81107
fp.close()
82108
if delete:
83109
await self._loop.run_in_executor(pool, os.remove, path)
84-
return bytes
110+
return bytes_
85111

86-
def _write_file(self, data, ident):
87-
with open(os.path.join(self._message_path, ident), "wb") as fp:
112+
def _write_file(self, data, item):
113+
with open(os.path.join(self._message_path, item["ident"]), "wb") as fp:
88114
fp.write(data)
89115

90116
async def expire(self):
91117
async with self._manifest_lock:
92118
new_queue = asyncio.Queue()
93119
while self.q.qsize() > 0:
94-
ident = await self.q.get()
95-
data = await self._get_file(ident, handle_only=True, delete=False)
96-
# TODO: This will never work, it's not pure json anymore
97-
msg = json.load(data)
98-
if "expire_time" in msg and msg['expire_time'] < time.time():
120+
item = await self.q.get()
121+
ident = item["ident"]
122+
expire_time = item["expire_time"]
123+
if expire_time < datetime.datetime.utcnow():
99124
logger.info("Expiring message %s", ident)
100125
# TODO: Do something with expired message
101126
await self._loop.run_in_executor(pool, os.remove, self._path_for_ident(ident))
102127
else:
103-
await new_queue.put(ident)
128+
await new_queue.put(item)
104129
self.q = new_queue
105130

106131

receptor/receptor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import pkg_resources
21
import asyncio
32
import copy
43
import json
@@ -7,6 +6,8 @@
76
import time
87
import uuid
98

9+
import pkg_resources
10+
1011
from . import exceptions
1112
from .messages import directive, envelope
1213
from .router import MeshRouter

0 commit comments

Comments
 (0)