Skip to content

Commit c5cf514

Browse files
authored
Merge pull request #40 from jhjaggars/async-buffers
Adding Async buffers
2 parents 8b3cfb3 + 3d641e3 commit c5cf514

7 files changed

Lines changed: 213 additions & 207 deletions

File tree

receptor/buffers/file.py

Lines changed: 99 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,113 @@
1-
import logging
2-
import uuid
1+
import asyncio
32
import json
3+
import logging
44
import os
5+
import time
6+
import uuid
7+
from concurrent.futures import ThreadPoolExecutor
58

6-
from .base import BaseBufferManager, BaseBuffer
7-
from ..exceptions import ReceptorBufferError
9+
from .base import BaseBufferManager
810

911
logger = logging.getLogger(__name__)
12+
pool = ThreadPoolExecutor()
1013

1114

12-
class FileBufferManager(BaseBufferManager):
13-
_buffers = {}
14-
15-
def get_buffer_for_node(self, node_id, receptor):
16-
return self._buffers.setdefault(node_id, FileBuffer(node_id, receptor))
17-
18-
19-
class FileBuffer(BaseBuffer):
20-
21-
def __init__(self, node_id, receptor):
22-
super().__init__(node_id, receptor)
23-
self.node_id = node_id
24-
self.base_path = os.path.join(os.path.expanduser(self.receptor.config.default_data_dir))
25-
self.message_path = os.path.join(self.base_path, self.receptor.node_id, "messages")
26-
self.manifest_path = os.path.join(self.base_path, self.receptor.node_id, "manifest-{}".format(node_id))
27-
if not os.path.exists(self.message_path):
28-
os.makedirs(self.message_path, mode=0o700)
29-
30-
def __iter__(self):
31-
self.current = 0
32-
return self
33-
34-
def __next__(self):
35-
manifest = self.read_manifest()
36-
if len(manifest) <= self.current:
37-
raise StopIteration
38-
ident = manifest[self.current]
39-
current_payload = self.read(ident)
40-
self.current += 1
41-
return ident, current_payload
15+
class DurableBuffer:
4216

43-
def new_message(self):
44-
ident = str(uuid.uuid4())
45-
try:
46-
handle = open(os.path.join(self.message_path, ident), "wb")
47-
except Exception as e:
48-
raise ReceptorBufferError("Failed to generate new message file for {}: {}".format(self.node_id, e))
49-
return (ident, handle)
50-
51-
def read_message(self, ident, remove=True):
17+
def __init__(self, dir_, key, loop):
18+
self.q = asyncio.Queue()
19+
self._base_path = os.path.join(os.path.expanduser(dir_))
20+
self._message_path = os.path.join(self._base_path, "messages")
21+
self._manifest_path = os.path.join(self._base_path, f"manifest-{key}")
22+
self._loop = loop
23+
self._manifest_lock = asyncio.Lock(loop=self._loop)
5224
try:
53-
message_data = open(os.path.join(self.message_path, ident), "rb").read()
54-
if remove:
55-
os.remove(os.path.join(self.message_path, ident))
56-
except Exception as e:
57-
raise ReceptorBufferError("Failed to handle message data file for {} {}: {}".format(self.node_id, ident, e))
58-
return message_data
59-
60-
def write_manifest(self, manifest):
61-
try:
62-
fd = open(self.manifest_path, "w")
63-
json.dump(manifest, fd)
64-
fd.close()
65-
except Exception as e:
66-
raise ReceptorBufferError("Failed to handle metadata file for {}: {}".format(self.node_id, e))
67-
68-
def read_manifest(self):
69-
if not os.path.exists(self.manifest_path):
70-
return []
25+
os.makedirs(self._message_path, mode=0o700)
26+
except Exception:
27+
pass
28+
for item in self._read_manifest():
29+
self.q.put_nowait(item)
30+
31+
async def put(self, data):
32+
ident = str(uuid.uuid4())
33+
await self._loop.run_in_executor(pool, self._write_file, data, ident)
34+
await self.q.put(ident)
35+
await self._save_manifest()
36+
37+
async def get(self, handle_only=False, delete=True):
38+
while True:
39+
msg = await self.q.get()
40+
await self._save_manifest()
41+
try:
42+
return await self._get_file(msg, handle_only=handle_only, delete=delete)
43+
except FileNotFoundError:
44+
pass
45+
46+
async def _save_manifest(self):
47+
async with self._manifest_lock:
48+
await self._loop.run_in_executor(pool, self._write_manifest)
49+
50+
def _write_manifest(self):
51+
with open(self._manifest_path, "w") as fp:
52+
json.dump(list(self.q._queue), fp)
53+
54+
def _read_manifest(self):
7155
try:
72-
fd = open(self.manifest_path, "r")
73-
manifest = json.load(fd)
74-
return manifest
75-
except Exception as e:
76-
logger.warn("Failed to read manifest: {}".format(e))
56+
with open(self._manifest_path, "r") as fp:
57+
return json.load(fp)
58+
except FileNotFoundError:
7759
return []
7860

79-
def push(self, message):
80-
manifest = self.read_manifest()
81-
ident, handle = self.new_message()
82-
try:
83-
handle.write(message)
84-
handle.close()
85-
except Exception as e:
86-
raise ReceptorBufferError("Failed to write message file for {} {}: {}".format(self.node_id, ident, e))
87-
manifest.append(ident)
88-
self.write_manifest(manifest)
61+
def _path_for_ident(self, ident):
62+
return os.path.join(self._message_path, ident)
63+
64+
async def _get_file(self, ident, handle_only=False, delete=True):
65+
"""
66+
Retrieves a file from disk. If handle_only is True then we will
67+
return the handle to the file and do nothing else. Otherwise the file
68+
is read into memory all at once and returned. If delete is True (the
69+
default) and handle_only is False (the default) then the underlying
70+
file will be removed as well.
71+
"""
72+
path = self._path_for_ident(ident)
73+
fp = await self._loop.run_in_executor(pool, open, path, "rb")
74+
if handle_only:
75+
return fp
76+
bytes = await self._loop.run_in_executor(pool, lambda: fp.read())
77+
fp.close()
78+
if delete:
79+
await self._loop.run_in_executor(pool, os.remove, path)
80+
return bytes
81+
82+
def _write_file(self, data, ident):
83+
with open(os.path.join(self._message_path, ident), "wb") as fp:
84+
fp.write(data)
85+
86+
async def expire(self):
87+
async with self._manifest_lock:
88+
new_queue = asyncio.Queue()
89+
while self.q.qsize() > 0:
90+
ident = await self.q.get()
91+
data = await self._get_file(ident, handle_only=True, delete=False)
92+
msg = json.load(data)
93+
if "expire_time" in msg and msg['expire_time'] < time.time():
94+
logger.info("Expiring message %s", ident)
95+
# TODO: Do something with expired message
96+
await self._loop.run_in_executor(pool, os.remove, self._path_for_ident(ident))
97+
else:
98+
await new_queue.put(ident)
99+
self.q = new_queue
100+
89101

90-
def read(self, ident, remove=False):
91-
manifest = self.read_manifest()
92-
message = self.read_message(ident, remove=remove)
93-
if remove:
94-
manifest.remove(ident)
95-
self.write_manifest(manifest)
96-
return message
102+
class FileBufferManager(BaseBufferManager):
103+
_buffers = {}
97104

98-
def pop(self):
99-
manifest = self.read_manifest()
100-
item = self.read_message(manifest.pop(0))
101-
self.write_manifest(manifest)
102-
return item
105+
def get_buffer_for_node(self, node_id, receptor):
106+
# due to the way that the manager is constructed, we won't have enough
107+
# information to build a proper defaultdict at the time, and we want to
108+
# make sure we only construct a single instance of DurableBuffer
109+
# per-node so.. doing this the hard way.
110+
if node_id not in self._buffers:
111+
path = os.path.join(os.path.expanduser(receptor.config.default_data_dir))
112+
self._buffers[node_id] = DurableBuffer(path, node_id, asyncio.get_event_loop())
113+
return self._buffers[node_id]

receptor/buffers/memory.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

receptor/protocol.py

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66
import time
77
import uuid
88

9-
from collections import deque
10-
119
from .messages import envelope
12-
from .exceptions import ReceptorBufferError
1310
from .stats import connected_peers_guage
1411

1512
logger = logging.getLogger(__name__)
@@ -19,19 +16,19 @@
1916

2017

2118
class DataBuffer:
22-
def __init__(self, deserializer=json.loads):
23-
self.q = deque()
19+
def __init__(self, loop=None, deserializer=json.loads):
20+
self.q = asyncio.Queue(loop=loop)
2421
self.data_buffer = b""
2522
self.deserializer = deserializer
2623

2724
def add(self, data):
2825
self.data_buffer = self.data_buffer + data
2926
*ready, self.data_buffer = self.data_buffer.rsplit(DELIM)
30-
self.q.extend(ready)
27+
for chunk in ready:
28+
self.q.put_nowait(chunk)
3129

32-
def get(self):
33-
while self.q:
34-
yield self.deserializer(self.q.popleft())
30+
async def get(self):
31+
return self.deserializer(await self.q.get())
3532

3633

3734
class BaseProtocol(asyncio.Protocol):
@@ -54,28 +51,24 @@ async def watch_queue(self):
5451
buffer_obj = buffer_mgr.get_buffer_for_node(self.id, self.receptor)
5552
while not self.transport.is_closing():
5653
try:
57-
msg = buffer_obj.pop()
54+
msg = await buffer_obj.get()
55+
except Exception:
56+
logger.exception("Unhandled error when fetch from buffer for %s", self.id)
57+
continue
58+
59+
try:
5860
self.transport.write(msg + DELIM)
59-
except IndexError:
60-
await asyncio.sleep(0.1)
61-
except ReceptorBufferError as e:
62-
logger.exception("Receptor Buffer Read Error: {}".format(e))
63-
# TODO: We need to try to send this message along somewhere else
64-
# and record the failure somewhere
65-
self.transport.close()
66-
return
67-
except Exception as e:
68-
logger.exception("Error received trying to write to {}: {}".format(self.id, e))
69-
buffer_obj.push(msg)
61+
except Exception:
62+
logger.exception("Error received trying to write to %s", self.id)
63+
await buffer_obj.put(msg)
7064
self.transport.close()
7165
return
7266

7367
def connection_made(self, transport):
7468
self.peername = transport.get_extra_info('peername')
7569
self.transport = transport
76-
self.greeted = False
77-
self.incoming_buffer = DataBuffer()
7870
connected_peers_guage.inc()
71+
self.incoming_buffer = DataBuffer(loop=self.loop)
7972
self.loop.create_task(self.wait_greeting())
8073

8174
def connection_lost(self, exc):
@@ -91,21 +84,17 @@ async def wait_greeting(self):
9184
Initialized when the connection is established to handle the greeting
9285
before transitioning to message processing.
9386
'''
94-
while not self.greeted:
95-
logger.debug('Looking for handshake...')
96-
for data in self.incoming_buffer.get():
97-
logger.debug(data)
98-
if data["cmd"] == "HI":
99-
self.handle_handshake(data)
100-
break
101-
else:
102-
logger.error("Handshake failed!")
103-
self.transport.close()
104-
await asyncio.sleep(.1)
105-
logger.debug("handshake complete, starting normal handle loop")
87+
logger.debug('Looking for handshake...')
88+
data = await self.incoming_buffer.get()
89+
logger.debug(data)
90+
if data["cmd"] == "HI":
91+
self.handle_handshake(data)
92+
logger.debug("handshake complete, starting normal handle loop")
93+
else:
94+
logger.error("Handshake failed!")
95+
self.transport.close()
10696

10797
def handle_handshake(self, data):
108-
self.greeted = True
10998
self.id = data["id"]
11099
self.meta = data.get("meta", {})
111100
self.receptor.add_connection(self)
@@ -133,7 +122,7 @@ def handle_handshake(self, data):
133122
super().handle_handshake(data)
134123
logger.debug("Received handshake from client with id %s, responding...", data["id"])
135124
self.send_handshake()
136-
self.receptor.send_route_advertisement()
125+
self.loop.create_task(self.receptor.send_route_advertisement())
137126

138127

139128
async def create_peer(receptor, loop, host, port):
@@ -163,7 +152,7 @@ def connection_lost(self, exc):
163152
def handle_handshake(self, data):
164153
super().handle_handshake(data)
165154
logger.debug("Received handshake from server with id %s", data["id"])
166-
self.receptor.send_route_advertisement()
155+
self.loop.create_task(self.receptor.send_route_advertisement())
167156

168157

169158
class BasicControllerProtocol(asyncio.Protocol):

0 commit comments

Comments
 (0)