Skip to content

Commit f135ae9

Browse files
committed
adding tests and delete behavior
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent d3ed0b6 commit f135ae9

2 files changed

Lines changed: 70 additions & 5 deletions

File tree

receptor/buffers/file.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,15 @@ async def put(self, data):
3939
await self.q.put(ident)
4040
await self._save_manifest()
4141

42-
async def get(self, handle_only=False):
42+
async def get(self, handle_only=False, delete=True):
4343
async with self._manifest_lock:
44-
msg = await self.q.get()
45-
await self._save_manifest()
46-
return await self._get_file(msg, handle_only=handle_only)
44+
while True:
45+
msg = await self.q.get()
46+
await self._save_manifest()
47+
try:
48+
return await self._get_file(msg, handle_only=handle_only, delete=delete)
49+
except FileNotFoundError:
50+
pass
4751

4852
async def _save_manifest(self):
4953
await self._loop.run_in_executor(pool, self._write_manifest)
@@ -59,13 +63,22 @@ def _read_manifest(self):
5963
except FileNotFoundError:
6064
return []
6165

62-
async def _get_file(self, path, handle_only=False):
66+
async def _get_file(self, path, handle_only=False, delete=True):
67+
"""
68+
Retrieves a file from disk. If handle_only is True then we will
69+
return the handle to the file and do nothing else. Otherwise the file
70+
is read into memory all at once and returned. If delete is True (the
71+
default) and handle_only is False (the default) then the underlying
72+
file will be removed as well.
73+
"""
6374
path = os.path.join(self._message_path, path)
6475
fp = await self._loop.run_in_executor(pool, open, path, "rb")
6576
if handle_only:
6677
return fp
6778
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
6879
fp.close()
80+
if delete:
81+
os.remove(path)
6982
return bytes
7083

7184
def _write_file(self, data, ident):
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
import shutil
3+
import tempfile
4+
5+
import pytest
6+
7+
from receptor.buffers.file import DurableBuffer
8+
9+
10+
@pytest.yield_fixture
11+
def tempdir():
12+
dir_ = tempfile.mkdtemp()
13+
yield dir_
14+
shutil.rmtree(dir_)
15+
16+
17+
@pytest.mark.asyncio
18+
async def test_create(event_loop, tempdir):
19+
b = DurableBuffer(tempdir, "test_create", event_loop)
20+
await b.put(b"some data")
21+
assert await b.get() == b"some data"
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_manifest(event_loop, tempdir):
26+
b = DurableBuffer(tempdir, "test_manifest", event_loop)
27+
await b.put(b"one")
28+
await b.put(b"two")
29+
await b.put(b"three")
30+
assert b.q.qsize() == 3
31+
32+
assert len(b._read_manifest()) == 3
33+
34+
35+
@pytest.mark.asyncio
36+
async def test_unreadable_file(event_loop, tempdir):
37+
b = DurableBuffer(tempdir, "test_unreadable_file", event_loop)
38+
b.q._queue.appendleft("junk")
39+
await b.put(b"valid data")
40+
data = await b.get()
41+
assert data == b"valid data"
42+
assert b.q.empty()
43+
44+
45+
@pytest.mark.asyncio
46+
async def test_deletes_messages(event_loop, tempdir):
47+
b = DurableBuffer(tempdir, "test_deletes_messages", event_loop)
48+
await b.put(b"some data")
49+
ident = b.q._queue[0]
50+
assert await b.get() == b"some data"
51+
filepath = os.path.join(b._message_path, ident)
52+
assert not os.path.exists(filepath)

0 commit comments

Comments
 (0)