Skip to content

Commit 0677bd7

Browse files
authored
Merge pull request #20 from matburt/message_expiry
Add envelop element for expiration, track connections persistently
2 parents 678e167 + 577753c commit 0677bd7

7 files changed

Lines changed: 114 additions & 17 deletions

File tree

receptor/buffers/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ def __init__(self, node_id, receptor):
1414
self.node_id = node_id
1515
self.receptor = receptor
1616

17+
def __iter__(self):
18+
raise NotImplementedError
19+
20+
def __next__(self):
21+
raise NotImplementedError
22+
1723
def push(self, message):
1824
raise NotImplementedError()
1925

receptor/buffers/file.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import collections
21
import logging
32
import uuid
43
import json
@@ -28,6 +27,18 @@ def __init__(self, node_id, receptor):
2827
if not os.path.exists(self.message_path):
2928
os.makedirs(self.message_path, mode=0o700)
3029

30+
def __iter__(self):
31+
self.current = 0
32+
return self
33+
34+
def __next__(self):
35+
manifest = self.read_manifest()
36+
if len(manifest) <= self.current:
37+
raise StopIteration
38+
ident = manifest[self.current]
39+
current_payload = self.read(ident)
40+
self.current += 1
41+
return ident, current_payload
3142

3243
def new_message(self):
3344
ident = str(uuid.uuid4())
@@ -37,10 +48,11 @@ def new_message(self):
3748
raise ReceptorBufferError("Failed to generate new message file for {}: {}".format(self.node_id, e))
3849
return (ident, handle)
3950

40-
def read_message(self, ident):
51+
def read_message(self, ident, remove=True):
4152
try:
4253
message_data = open(os.path.join(self.message_path, ident), "rb").read()
43-
os.remove(os.path.join(self.message_path, ident))
54+
if remove:
55+
os.remove(os.path.join(self.message_path, ident))
4456
except Exception as e:
4557
raise ReceptorBufferError("Failed to handle message data file for {} {}: {}".format(self.node_id, ident, e))
4658
return message_data
@@ -75,6 +87,14 @@ def push(self, message):
7587
manifest.append(ident)
7688
self.write_manifest(manifest)
7789

90+
def read(self, ident, remove=False):
91+
manifest = self.read_manifest()
92+
message = self.read_message(ident, remove=remove)
93+
if remove:
94+
manifest.remove(ident)
95+
self.write_manifest(manifest)
96+
return message
97+
7898
def pop(self):
7999
manifest = self.read_manifest()
80100
item = self.read_message(manifest.pop(0))

receptor/controller.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def mainloop(receptor, socket_path, loop=asyncio.get_event_loop()):
3535
)
3636
logger.info(f'Opening control socket on {socket_path}')
3737
loop.create_task(control_listener)
38+
loop.create_task(receptor.watch_expire())
3839
try:
3940
loop.run_forever()
4041
except KeyboardInterrupt:

receptor/messages/envelope.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import uuid
6+
import time
67

78
logger = logging.getLogger(__name__)
89

@@ -37,7 +38,7 @@ def serialize(self):
3738
class InnerEnvelope:
3839
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
3940
raw_payload, directive=None, in_response_to=None, ttl=None,
40-
serial=1):
41+
serial=1, expire_time_delta=300):
4142
self.receptor = receptor
4243
self.message_id = message_id
4344
self.sender = sender
@@ -48,6 +49,9 @@ def __init__(self, receptor, message_id, sender, recipient, message_type, timest
4849
self.directive = directive # None if response, 'namespace:action' if not
4950
self.in_response_to = in_response_to # None if directive, a message_id if not
5051
self.ttl = ttl # Optional
52+
if not expire_time_delta:
53+
self.expire_time = None
54+
self.expire_time = time.time() + expire_time_delta
5155
self.serial = serial # serial index of responses
5256

5357
@classmethod

receptor/node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def mainloop(receptor, ping_interval=None, loop=asyncio.get_event_loop(), skip_r
1919
if ping_interval:
2020
ping_time = (((int(loop.time()) + 1) // ping_interval) + 1) * ping_interval
2121
loop.call_at(ping_time, loop.create_task, send_pings_and_reschedule(receptor, loop, ping_time, ping_interval))
22+
loop.create_task(receptor.watch_expire())
2223
if not skip_run:
2324
try:
2425
loop.run_until_complete(receptor.shutdown_handler())

receptor/protocol.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import datetime
22
import asyncio
33
import logging
4+
import time
45
import json
56
import uuid
67
from collections import deque
@@ -35,6 +36,11 @@ def __init__(self, receptor, loop):
3536
self.loop = loop
3637

3738
async def watch_queue(self, node, transport):
39+
'''
40+
Watches the buffer for this connection for messages delivered from other
41+
parts of Receptor (forwarded messages for example) for messages to send
42+
over the connection.
43+
'''
3844
buffer_mgr = self.receptor.config.components.buffer_manager
3945
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)
4046
while not transport.is_closing():
@@ -63,13 +69,17 @@ def connection_made(self, transport):
6369
self.loop.create_task(self.wait_greeting())
6470

6571
def connection_lost(self, exc):
66-
self.receptor.remove_connection(self)
72+
self.receptor.remove_connection(self.connection)
6773

6874
def data_received(self, data):
6975
logger.debug(data)
7076
self.incoming_buffer.add(data)
7177

7278
async def wait_greeting(self):
79+
'''
80+
Initialized when the connection is established to handle the greeting
81+
before transitioning to message processing.
82+
'''
7383
while not self.greeted:
7484
logger.debug('Looking for handshake...')
7585
for data in self.incoming_buffer.get():
@@ -79,10 +89,10 @@ async def wait_greeting(self):
7989
break
8090
else:
8191
logger.error("Handshake failed!")
82-
# TODO: Trigger disconnection
92+
self.transport.close()
8393
await asyncio.sleep(.1)
8494
logger.debug("handshake complete, starting normal handle loop")
85-
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated?
95+
self.loop.create_task(self.connection.message_handler(self.incoming_buffer)) # Duplicated (see handle_handshake)?
8696

8797
def handle_handshake(self, data):
8898
self.greeted = True
@@ -94,6 +104,7 @@ def send_handshake(self):
94104
msg = json.dumps({
95105
"cmd": "HI",
96106
"id": self.receptor.node_id,
107+
"expire_time": time.time() + 10,
97108
}).encode("utf-8")
98109
self.transport.write(msg + DELIM)
99110

@@ -175,7 +186,6 @@ def data_received(self, data):
175186
raw_payload=payload,
176187
directive=directive
177188
)
178-
# TODO: Response expiration task?
179189
# TODO: Persistent registry?
180190
self.loop.create_task(self.receptor.router.send(inner_env,
181191
expected_response=True))

receptor/receptor.py

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import os
2+
import json
23
import uuid
4+
import time
35
import asyncio
46
import logging
7+
import copy
58

69
from .config import ReceptorConfig
710
from .router import MeshRouter
@@ -20,6 +23,9 @@ def __init__(self, config=None, node_id=None, router_cls=None,
2023
self.work_manager = (work_manager_cls or WorkManager)(self)
2124
self.connections = dict()
2225
self.controller_connections = []
26+
self.connection_manifest_path = os.path.join(self.config.server.data_dir,
27+
self.node_id,
28+
"connection_manifest")
2329
self.stop = False
2430

2531
def _find_node_id(self):
@@ -32,31 +38,80 @@ def _find_node_id(self):
3238
ofs.write(f'\nRECEPTOR_NODE_ID={node_id}\n')
3339
return str(node_id)
3440

41+
async def watch_expire(self):
42+
while True:
43+
current_manifest = self.get_connection_manifest()
44+
for connection in current_manifest:
45+
buffer = self.config.components.buffer_manager.get_buffer_for_node(connection["id"], self)
46+
for ident, message in buffer:
47+
message_actual = json.loads(message)
48+
if "expire_time" in message_actual and message_actual['expire_time'] < time.time():
49+
buffer.read_message(ident, remove=True)
50+
logger.info("Expiring message {}:{}".format(ident, connection["id"]))
51+
# TODO: Do something with expired message
52+
if connection["last"] + 86400 < time.time():
53+
logger.info("Expiring connection {}".format(connection["id"]))
54+
write_manifest = copy.copy(current_manifest)
55+
write_manifest.remove(connection)
56+
self.write_connection_manifest(write_manifest)
57+
await asyncio.sleep(600)
58+
59+
def get_connection_manifest(self):
60+
if not os.path.exists(self.connection_manifest_path):
61+
return []
62+
try:
63+
fd = open(self.connection_manifest_path, "r")
64+
manifest = json.load(fd)
65+
return manifest
66+
except Exception as e:
67+
logger.warn("Failed to read connection manifest: {}".format(e))
68+
return []
69+
70+
def write_connection_manifest(self, manifest):
71+
fd = open(self.connection_manifest_path, "w")
72+
json.dump(manifest, fd)
73+
fd.close()
74+
75+
def update_connection_manifest(self, connection):
76+
manifest = self.get_connection_manifest()
77+
found = False
78+
for node in manifest:
79+
if node["id"] == connection:
80+
node["last"] = time.time()
81+
found = True
82+
break
83+
if not found:
84+
manifest.append(dict(id=connection,
85+
last=time.time()))
86+
self.write_connection_manifest(manifest)
87+
3588
def update_connections(self, connection):
3689
self.router.register_edge(connection.id_, self.node_id, 1)
3790
if connection.id_ in self.connections:
3891
self.connections[connection.id_].append(connection)
3992
else:
4093
self.connections[connection.id_] = [connection]
94+
self.update_connection_manifest(connection.id_)
4195

4296
def add_connection(self, id_, protocol_obj):
4397
buffer_mgr = self.config.components.buffer_manager
4498
conn = Connection(id_, protocol_obj, buffer_mgr, self)
4599
self.update_connections(conn)
46100
return conn
47101

48-
def remove_connection(self, conn):
49-
notify_protocols = []
102+
def remove_connection(self, protocol_obj):
103+
notify_connections = []
50104
for connection_node in self.connections:
51-
if conn in self.connections[connection_node]:
52-
logger.info("Removing connection {} for node {}".format(conn, connection_node))
53-
self.connections[connection_node].remove(conn)
105+
if protocol_obj in self.connections[connection_node]:
106+
logger.info("Removing connection {} for node {}".format(protocol_obj, connection_node))
107+
self.update_connection_manifest(connection_node)
108+
self.connections[connection_node].remove(protocol_obj)
54109
self.router.update_node(self.node_id, connection_node, 100)
55110
self.router.debug_router()
56-
notify_protocols += self.connections[connection_node]
57-
# TODO: Broadcast update, set timer for full expiration
58-
for active_protocol in notify_protocols:
59-
active_protocol.send_route_advertisement(self.router.get_edges())
111+
self.update_connection_manifest(connection_node)
112+
notify_connections += self.connections[connection_node]
113+
for active_connection in notify_connections:
114+
active_connection.send_route_advertisement(self.router.get_edges())
60115

61116
async def shutdown_handler(self):
62117
while True:

0 commit comments

Comments
 (0)