Skip to content

Commit 525762d

Browse files
committed
wiring DurableBuffer up
Signed-off-by: Jesse Jaggars <jjaggars@redhat.com>
1 parent dd5ad29 commit 525762d

5 files changed

Lines changed: 35 additions & 169 deletions

File tree

receptor/buffers/file.py

Lines changed: 16 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
from collections import deque
77
from concurrent.futures import ThreadPoolExecutor
88

9-
from ..exceptions import ReceptorBufferError
10-
from .base import BaseBuffer, BaseBufferManager
9+
from .base import BaseBufferManager
1110

1211
logger = logging.getLogger(__name__)
1312
pool = ThreadPoolExecutor()
@@ -33,24 +32,23 @@ def __init__(self, dir_, key, loop):
3332
self.q._queue = deque(self._read_manifest())
3433

3534
async def put(self, data):
36-
async with self._manifest_lock:
37-
ident = str(uuid.uuid4())
38-
await self._loop.run_in_executor(pool, self._write_file, data, ident)
39-
await self.q.put(ident)
40-
await self._save_manifest()
35+
ident = str(uuid.uuid4())
36+
await self._loop.run_in_executor(pool, self._write_file, data, ident)
37+
await self.q.put(ident)
38+
await self._save_manifest()
4139

4240
async def get(self, handle_only=False, delete=True):
43-
async with self._manifest_lock:
44-
while True:
45-
msg = await self.q.get()
46-
await self._save_manifest()
47-
try:
48-
return await self._get_file(msg, handle_only=handle_only, delete=delete)
49-
except FileNotFoundError:
50-
pass
41+
while True:
42+
msg = await self.q.get()
43+
await self._save_manifest()
44+
try:
45+
return await self._get_file(msg, handle_only=handle_only, delete=delete)
46+
except FileNotFoundError:
47+
pass
5148

5249
async def _save_manifest(self):
53-
await self._loop.run_in_executor(pool, self._write_manifest)
50+
async with self._manifest_lock:
51+
await self._loop.run_in_executor(pool, self._write_manifest)
5452

5553
def _write_manifest(self):
5654
with open(self._manifest_path, "w") as fp:
@@ -90,91 +88,5 @@ class FileBufferManager(BaseBufferManager):
9088
_buffers = {}
9189

9290
def get_buffer_for_node(self, node_id, receptor):
93-
return self._buffers.setdefault(node_id, FileBuffer(node_id, receptor))
94-
95-
96-
class FileBuffer(BaseBuffer):
97-
98-
def __init__(self, node_id, receptor):
99-
super().__init__(node_id, receptor)
100-
self.node_id = node_id
101-
self.loop = asyncio.get_event_loop()
102-
self.base_path = os.path.join(os.path.expanduser(self.receptor.config.default_data_dir))
103-
self.message_path = os.path.join(self.base_path, self.receptor.node_id, "messages")
104-
self.manifest_path = os.path.join(self.base_path, self.receptor.node_id, "manifest-{}".format(node_id))
105-
if not os.path.exists(self.message_path):
106-
os.makedirs(self.message_path, mode=0o700)
107-
108-
def __iter__(self):
109-
self.current = 0
110-
return self
111-
112-
def __next__(self):
113-
manifest = self.read_manifest()
114-
if len(manifest) <= self.current:
115-
raise StopIteration
116-
ident = manifest[self.current]
117-
current_payload = self.read(ident)
118-
self.current += 1
119-
return ident, current_payload
120-
121-
def new_message(self):
122-
ident = str(uuid.uuid4())
123-
try:
124-
handle = open(os.path.join(self.message_path, ident), "wb")
125-
except Exception as e:
126-
raise ReceptorBufferError("Failed to generate new message file for {}: {}".format(self.node_id, e))
127-
return (ident, handle)
128-
129-
def read_message(self, ident, remove=True):
130-
try:
131-
message_data = open(os.path.join(self.message_path, ident), "rb").read()
132-
if remove:
133-
os.remove(os.path.join(self.message_path, ident))
134-
except Exception as e:
135-
raise ReceptorBufferError("Failed to handle message data file for {} {}: {}".format(self.node_id, ident, e))
136-
return message_data
137-
138-
def write_manifest(self, manifest):
139-
try:
140-
fd = open(self.manifest_path, "w")
141-
json.dump(manifest, fd)
142-
fd.close()
143-
except Exception as e:
144-
raise ReceptorBufferError("Failed to handle metadata file for {}: {}".format(self.node_id, e))
145-
146-
def read_manifest(self):
147-
if not os.path.exists(self.manifest_path):
148-
return []
149-
try:
150-
fd = open(self.manifest_path, "r")
151-
manifest = json.load(fd)
152-
return manifest
153-
except Exception as e:
154-
logger.warn("Failed to read manifest: {}".format(e))
155-
return []
156-
157-
def push(self, message):
158-
manifest = self.read_manifest()
159-
ident, handle = self.new_message()
160-
try:
161-
handle.write(message)
162-
handle.close()
163-
except Exception as e:
164-
raise ReceptorBufferError("Failed to write message file for {} {}: {}".format(self.node_id, ident, e))
165-
manifest.append(ident)
166-
self.write_manifest(manifest)
167-
168-
def read(self, ident, remove=False):
169-
manifest = self.read_manifest()
170-
message = self.read_message(ident, remove=remove)
171-
if remove:
172-
manifest.remove(ident)
173-
self.write_manifest(manifest)
174-
return message
175-
176-
def pop(self):
177-
manifest = self.read_manifest()
178-
item = self.read_message(manifest.pop(0))
179-
self.write_manifest(manifest)
180-
return item
91+
path = os.path.join(os.path.expanduser(receptor.config.default_data_dir))
92+
return self._buffers.setdefault(node_id, DurableBuffer(path, node_id, asyncio.get_event_loop()))

receptor/buffers/memory.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

receptor/protocol.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77
import uuid
88

9-
from .exceptions import ReceptorBufferError
109
from .messages import envelope
1110
from .stats import connected_peers_guage
1211

@@ -52,19 +51,13 @@ async def watch_queue(self):
5251
buffer_obj = buffer_mgr.get_buffer_for_node(self.id, self.receptor)
5352
while not self.transport.is_closing():
5453
try:
55-
msg = buffer_obj.pop()
54+
logger.debug("about to wait for a message")
55+
msg = await buffer_obj.get()
56+
logger.debug("got a message to send")
5657
self.transport.write(msg + DELIM)
57-
except IndexError:
58-
await asyncio.sleep(0.1)
59-
except ReceptorBufferError as e:
60-
logger.exception("Receptor Buffer Read Error: {}".format(e))
61-
# TODO: We need to try to send this message along somewhere else
62-
# and record the failure somewhere
63-
self.transport.close()
64-
return
65-
except Exception as e:
66-
logger.exception("Error received trying to write to {}: {}".format(self.id, e))
67-
buffer_obj.push(msg)
58+
except Exception:
59+
logger.exception("Error received trying to write to %s", self.id)
60+
await buffer_obj.put(msg)
6861
self.transport.close()
6962
return
7063

@@ -126,7 +119,7 @@ def handle_handshake(self, data):
126119
super().handle_handshake(data)
127120
logger.debug("Received handshake from client with id %s, responding...", data["id"])
128121
self.send_handshake()
129-
self.receptor.send_route_advertisement()
122+
self.loop.create_task(self.receptor.send_route_advertisement())
130123

131124

132125
async def create_peer(receptor, loop, host, port):
@@ -156,7 +149,7 @@ def connection_lost(self, exc):
156149
def handle_handshake(self, data):
157150
super().handle_handshake(data)
158151
logger.debug("Received handshake from server with id %s", data["id"])
159-
self.receptor.send_route_advertisement()
152+
self.loop.create_task(self.receptor.send_route_advertisement())
160153

161154

162155
class BasicControllerProtocol(asyncio.Protocol):

receptor/receptor.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ async def message_handler(self, buf):
101101
while True:
102102
data = await buf.get()
103103
if "cmd" in data and data["cmd"] == "ROUTE":
104-
self.handle_route_advertisement(data)
104+
await self.handle_route_advertisement(data)
105105
else:
106106
await self.handle_message(data)
107107

@@ -119,19 +119,19 @@ def remove_connection(self, protocol_obj):
119119
self.router.debug_router()
120120
self.update_connection_manifest(connection_node)
121121
notify_connections += self.connections[connection_node]
122-
self.send_route_advertisement(self.router.get_edges())
122+
protocol_obj.loop.create_task(self.send_route_advertisement(self.router.get_edges()))
123123

124124
async def shutdown_handler(self):
125125
while True:
126126
if self.stop:
127127
return
128128
await asyncio.sleep(1)
129129

130-
def handle_route_advertisement(self, data):
130+
async def handle_route_advertisement(self, data):
131131
self.router.add_edges(data["edges"])
132-
self.send_route_advertisement(data["edges"], data["seen"])
132+
await self.send_route_advertisement(data["edges"], data["seen"])
133133

134-
def send_route_advertisement(self, edges=None, seen=[]):
134+
async def send_route_advertisement(self, edges=None, seen=[]):
135135
edges = edges or self.router.get_edges()
136136
seen = set(seen)
137137
logger.debug("Emitting Route Advertisements, excluding {}".format(seen))
@@ -142,17 +142,14 @@ def send_route_advertisement(self, edges=None, seen=[]):
142142
for target in destinations:
143143
buf = self.buffer_mgr.get_buffer_for_node(target, self)
144144
try:
145-
buf.push(json.dumps({
145+
await buf.put(json.dumps({
146146
"cmd": "ROUTE",
147147
"id": self.node_id,
148148
"capabilities": self.work_manager.get_capabilities(),
149149
"groups": self.config.node_groups,
150150
"edges": edges,
151151
"seen": seens
152152
}).encode("utf-8"))
153-
except exceptions.ReceptorBufferError as e:
154-
logger.exception("Receptor Buffer Write Error broadcasting routes and capabilities: {}".format(e))
155-
# TODO: This might should be a hard shutdown event
156153
except Exception as e:
157154
logger.exception("Error trying to broadcast routes and capabilities: {}".format(e))
158155

receptor/router.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import logging
2-
31
import datetime
4-
from collections import defaultdict
52
import heapq
3+
import logging
64
import random
75
import uuid
6+
from collections import defaultdict
87

98
from dateutil import parser
9+
10+
from .exceptions import ReceptorBufferError, UnrouteableError
1011
from .messages import envelope
11-
from .exceptions import UnrouteableError, ReceptorBufferError
1212
from .stats import route_counter
1313

1414
logger = logging.getLogger(__name__)
@@ -147,7 +147,7 @@ async def forward(self, outer_envelope, next_hop):
147147
logger.debug(f'Forwarding frame {outer_envelope.frame_id} to {next_hop}')
148148
try:
149149
route_counter.inc()
150-
buffer_obj.push(outer_envelope.serialize().encode("utf-8"))
150+
await buffer_obj.put(outer_envelope.serialize().encode("utf-8"))
151151
except ReceptorBufferError as e:
152152
logger.exception("Receptor Buffer Write Error forwarding message to {}: {}".format(next_hop, e))
153153
# TODO: Possible to find another route? This might be a hard failure
@@ -188,6 +188,3 @@ async def send(self, inner_envelope, expected_response=False):
188188
if expected_response and inner_envelope.message_type == 'directive':
189189
self.response_registry[inner_envelope.message_id] = dict(message_sent_time=inner_envelope.timestamp)
190190
await self.forward(outer_envelope, next_node_id)
191-
192-
193-

0 commit comments

Comments
 (0)