Skip to content

Commit d3ed0b6

Browse files
committed
adding an async file-backed buffer class
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent c235058 commit d3ed0b6

1 file changed

Lines changed: 68 additions & 3 deletions

File tree

receptor/buffers/file.py

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,77 @@
1-
import logging
2-
import uuid
1+
import asyncio
32
import json
3+
import logging
44
import os
5+
import uuid
6+
from collections import deque
7+
from concurrent.futures import ThreadPoolExecutor
58

6-
from .base import BaseBufferManager, BaseBuffer
79
from ..exceptions import ReceptorBufferError
10+
from .base import BaseBuffer, BaseBufferManager
811

912
logger = logging.getLogger(__name__)
13+
pool = ThreadPoolExecutor()
14+
15+
16+
class DurableBuffer:
17+
18+
def __init__(self, dir_, key, loop):
19+
self.q = asyncio.Queue()
20+
self._base_path = os.path.join(os.path.expanduser(dir_))
21+
self._message_path = os.path.join(self._base_path, "messages")
22+
self._manifest_path = os.path.join(self._base_path, f"manifest-{key}")
23+
self._loop = loop
24+
self._manifest_lock = asyncio.Lock(loop=self._loop)
25+
try:
26+
os.makedirs(self._message_path, mode=0o700)
27+
except Exception:
28+
pass
29+
# We are setting the internal queue for the asyncio Queue to have a
30+
# default from what we have in the manifest. This relies on an
31+
# implementation detail of asyncio.Queue because there doesn't appear
32+
# to be a way to set initial state otherwise
33+
self.q._queue = deque(self._read_manifest())
34+
35+
async def put(self, data):
36+
async with self._manifest_lock:
37+
ident = str(uuid.uuid4())
38+
await self._loop.run_in_executor(pool, self._write_file, data, ident)
39+
await self.q.put(ident)
40+
await self._save_manifest()
41+
42+
async def get(self, handle_only=False):
43+
async with self._manifest_lock:
44+
msg = await self.q.get()
45+
await self._save_manifest()
46+
return await self._get_file(msg, handle_only=handle_only)
47+
48+
async def _save_manifest(self):
49+
await self._loop.run_in_executor(pool, self._write_manifest)
50+
51+
def _write_manifest(self):
52+
with open(self._manifest_path, "w") as fp:
53+
json.dump(list(self.q._queue), fp)
54+
55+
def _read_manifest(self):
56+
try:
57+
with open(self._manifest_path, "r") as fp:
58+
return json.load(fp)
59+
except FileNotFoundError:
60+
return []
61+
62+
async def _get_file(self, path, handle_only=False):
63+
path = os.path.join(self._message_path, path)
64+
fp = await self._loop.run_in_executor(pool, open, path, "rb")
65+
if handle_only:
66+
return fp
67+
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
68+
fp.close()
69+
return bytes
1070

71+
def _write_file(self, data, ident):
72+
with open(os.path.join(self._message_path, ident), "wb") as fp:
73+
fp.write(data)
74+
1175

1276
class FileBufferManager(BaseBufferManager):
1377
_buffers = {}
@@ -21,6 +85,7 @@ class FileBuffer(BaseBuffer):
2185
def __init__(self, node_id, receptor):
2286
super().__init__(node_id, receptor)
2387
self.node_id = node_id
88+
self.loop = asyncio.get_event_loop()
2489
self.base_path = os.path.join(os.path.expanduser(self.receptor.config.default_data_dir))
2590
self.message_path = os.path.join(self.base_path, self.receptor.node_id, "messages")
2691
self.manifest_path = os.path.join(self.base_path, self.receptor.node_id, "manifest-{}".format(node_id))

0 commit comments

Comments
 (0)