Skip to content

Commit 678e167

Browse files
committed
Refactor buffer manager to take receptor object
In order to get the local node-id Fix an issue importing exception in the buffer object itself
1 parent e266b1b commit 678e167

6 files changed

Lines changed: 19 additions & 19 deletions

File tree

receptor/buffers/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@
33

44

55
class BaseBufferManager:
6-
def get_buffer_for_node(self, node_id, config):
6+
def get_buffer_for_node(self, node_id, receptor):
77
raise NotImplementedError()
88

99

1010
class BaseBuffer:
1111
node_id = None
1212

13-
def __init__(self, node_id, config):
13+
def __init__(self, node_id, receptor):
1414
self.node_id = node_id
15-
self.config = config
15+
self.receptor = receptor
1616

1717
def push(self, message):
1818
raise NotImplementedError()

receptor/buffers/file.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@
55
import os
66

77
from .base import BaseBufferManager, BaseBuffer
8-
from .exceptions import ReceptorBufferError
8+
from ..exceptions import ReceptorBufferError
99

1010
logger = logging.getLogger(__name__)
1111

1212

1313
class FileBufferManager(BaseBufferManager):
1414
_buffers = {}
1515

16-
def get_buffer_for_node(self, node_id, config):
17-
return self._buffers.setdefault(node_id, FileBuffer(node_id, config))
16+
def get_buffer_for_node(self, node_id, receptor):
17+
return self._buffers.setdefault(node_id, FileBuffer(node_id, receptor))
1818

1919

2020
class FileBuffer(BaseBuffer):
2121

22-
def __init__(self, node_id, config):
23-
super().__init__(node_id, config)
22+
def __init__(self, node_id, receptor):
23+
super().__init__(node_id, receptor)
2424
self.node_id = node_id
25-
self.base_path = os.path.join(os.path.expanduser(self.config.server.data_dir))
26-
self.message_path = os.path.join(self.base_path, "messages")
27-
self.manifest_path = os.path.join(self.base_path, "manifest-{}".format(node_id))
25+
self.base_path = os.path.join(os.path.expanduser(self.receptor.config.server.data_dir))
26+
self.message_path = os.path.join(self.base_path, self.receptor.node_id, "messages")
27+
self.manifest_path = os.path.join(self.base_path, self.receptor.node_id, "manifest-{}".format(node_id))
2828
if not os.path.exists(self.message_path):
2929
os.makedirs(self.message_path, mode=0o700)
3030

receptor/buffers/memory.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
class InMemoryBufferManager(BaseBufferManager):
1010
_buffers = {}
1111

12-
def get_buffer_for_node(self, node_id, config):
12+
def get_buffer_for_node(self, node_id, receptor):
1313
return self._buffers.setdefault(node_id, InMemoryBuffer(node_id))
1414

1515

1616
# NOTE: This is not thread safe
1717
class InMemoryBuffer(BaseBuffer):
18-
def __init__(self, node_id, config):
19-
super().__init__(node_id, config)
18+
def __init__(self, node_id, receptor):
19+
super().__init__(node_id, receptor)
2020
self._buffer = collections.deque()
2121

2222
def push(self, message):

receptor/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def send_route_advertisement(self, edges=None, seen=[]):
4747

4848
# TODO: This should be a broadcast call to the connection manager
4949
for target in destinations:
50-
buf = self.buffer_mgr.get_buffer_for_node(target, self.receptor.config)
50+
buf = self.buffer_mgr.get_buffer_for_node(target, self.receptor)
5151
try:
5252
buf.push(json.dumps({
5353
"cmd": "ROUTE",

receptor/protocol.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ def __init__(self, receptor, loop):
3636

3737
async def watch_queue(self, node, transport):
3838
buffer_mgr = self.receptor.config.components.buffer_manager
39-
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor.config)
39+
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)
4040
while not transport.is_closing():
4141
try:
4242
msg = buffer_obj.pop()
4343
transport.write(msg + DELIM)
4444
except IndexError:
45-
await asyncio.sleep(1)
45+
await asyncio.sleep(0.1)
4646
except ReceptorBufferError as e:
4747
logger.exception("Receptor Buffer Read Error: {}".format(e))
4848
# TODO: We need to try to send this message along somewhere else
@@ -82,7 +82,7 @@ async def wait_greeting(self):
8282
# TODO: Trigger disconnection
8383
await asyncio.sleep(.1)
8484
logger.debug("handshake complete, starting normal handle loop")
85-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer))
85+
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated?
8686

8787
def handle_handshake(self, data):
8888
self.greeted = True

receptor/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ async def forward(self, outer_envelope, next_hop):
133133
Forward a message on to the next hop closer to its destination
134134
"""
135135
buffer_mgr = self.receptor.config.components.buffer_manager
136-
buffer_obj = buffer_mgr.get_buffer_for_node(next_hop, self.receptor.config)
136+
buffer_obj = buffer_mgr.get_buffer_for_node(next_hop, self.receptor)
137137
outer_envelope.route_list.append(self.node_id)
138138
logger.debug(f'Forwarding frame {outer_envelope.frame_id} to {next_hop}')
139139
try:

0 commit comments

Comments
 (0)