Skip to content

Commit 34e5127

Browse files
committed
Implement message expiration at the connection level
1 parent 3d3e2ba commit 34e5127

5 files changed

Lines changed: 67 additions & 18 deletions

File tree

receptor/buffers/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ def __init__(self, node_id, receptor):
1414
self.node_id = node_id
1515
self.receptor = receptor
1616

17+
def __iter__(self):
18+
raise NotImplementedError
19+
20+
def __next__(self):
21+
raise NotImplementedError
22+
1723
def push(self, message):
1824
raise NotImplementedError()
1925

receptor/buffers/file.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ def __init__(self, node_id, receptor):
2828
if not os.path.exists(self.message_path):
2929
os.makedirs(self.message_path, mode=0o700)
3030

31+
def __iter__(self):
32+
self.current = 0
33+
return self
34+
35+
def __next__(self):
36+
manifest = self.read_manifest()
37+
if len(manifest) <= self.current:
38+
raise StopIteration
39+
ident = manifest[self.current]
40+
current_payload = self.read(ident)
41+
self.current += 1
42+
return ident, current_payload
3143

3244
def new_message(self):
3345
ident = str(uuid.uuid4())
@@ -37,10 +49,11 @@ def new_message(self):
3749
raise ReceptorBufferError("Failed to generate new message file for {}: {}".format(self.node_id, e))
3850
return (ident, handle)
3951

40-
def read_message(self, ident):
52+
def read_message(self, ident, remove=True):
4153
try:
4254
message_data = open(os.path.join(self.message_path, ident), "rb").read()
43-
os.remove(os.path.join(self.message_path, ident))
55+
if remove:
56+
os.remove(os.path.join(self.message_path, ident))
4457
except Exception as e:
4558
raise ReceptorBufferError("Failed to handle message data file for {} {}: {}".format(self.node_id, ident, e))
4659
return message_data
@@ -75,6 +88,14 @@ def push(self, message):
7588
manifest.append(ident)
7689
self.write_manifest(manifest)
7790

91+
def read(self, ident, remove=False):
92+
manifest = self.read_manifest()
93+
message = self.read_message(ident, remove=remove)
94+
if remove:
95+
manifest.remove(ident)
96+
self.write_manifest(manifest)
97+
return message
98+
7899
def pop(self):
79100
manifest = self.read_manifest()
80101
item = self.read_message(manifest.pop(0))

receptor/node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def mainloop(receptor, ping_interval=None, loop=asyncio.get_event_loop(), skip_r
1919
if ping_interval:
2020
ping_time = (((int(loop.time()) + 1) // ping_interval) + 1) * ping_interval
2121
loop.call_at(ping_time, loop.create_task, send_pings_and_reschedule(receptor, loop, ping_time, ping_interval))
22+
loop.create_task(receptor.watch_expire())
2223
if not skip_run:
2324
try:
2425
loop.run_until_complete(receptor.shutdown_handler())

receptor/protocol.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ def __init__(self, receptor, loop):
3434
self.receptor = receptor
3535
self.loop = loop
3636

37-
async def watch_message_expire(self):
38-
pass
39-
4037
async def watch_queue(self, node, transport):
4138
buffer_mgr = self.receptor.config.components.buffer_manager
4239
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)

receptor/receptor.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
import asyncio
66
import logging
7+
import copy
78

89
from .config import ReceptorConfig
910
from .router import MeshRouter
@@ -37,6 +38,26 @@ def _find_node_id(self):
3738
ofs.write(f'\nRECEPTOR_NODE_ID={node_id}\n')
3839
return str(node_id)
3940

41+
async def watch_expire(self):
42+
while True:
43+
logger.info("Checking expirations")
44+
current_manifest = self.get_connection_manifest()
45+
for connection in current_manifest:
46+
buffer = self.config.components.buffer_manager.get_buffer_for_node(connection["id"], self)
47+
for ident, message in buffer:
48+
message_actual = json.loads(message)
49+
logger.info("Examining {}".format(message_data))
50+
if "expire_time" in message_data and message_data['expire_time'] < time.time():
51+
logger.info("Expiring message {}:{}".format(ident, connection["id"]))
52+
expired_message = buffer.read_message(ident, remove=True)
53+
# TODO: Do something with expired message
54+
if connection["last"] + 86400 < time.time():
55+
logger.info("Expiring connection {}".format(connection["id"]))
56+
write_manifest = copy.copy(current_manifest)
57+
write_manifest.remove(connection)
58+
self.write_connection_manifest(write_manifest)
59+
await asyncio.sleep(600)
60+
4061
def get_connection_manifest(self):
4162
if not os.path.exists(self.connection_manifest_path):
4263
return []
@@ -48,43 +69,46 @@ def get_connection_manifest(self):
4869
logger.warn("Failed to read connection manifest: {}".format(e))
4970
return []
5071

72+
def write_connection_manifest(self, manifest):
73+
fd = open(self.connection_manifest_path, "w")
74+
json.dump(manifest, fd)
75+
fd.close()
76+
5177
def update_connection_manifest(self, connection):
5278
manifest = self.get_connection_manifest()
5379
found = False
5480
for node in manifest:
55-
if node["id"] == connection.id_:
81+
if node["id"] == connection:
5682
node["last"] = time.time()
5783
found = True
5884
break
5985
if not found:
60-
node.append(dict(id=connection.id_,
61-
last=time.time()))
62-
fd = open(self.connection_manifest_path, "w")
63-
json.dump(manifest, fd)
64-
fd.close()
86+
manifest.append(dict(id=connection.id_,
87+
last=time.time()))
88+
self.write_connection_manifest(manifest)
6589

66-
6790
def update_connections(self, connection):
6891
self.router.register_edge(connection.id_, self.node_id, 1)
6992
if connection.id_ in self.connections:
7093
self.connections[connection.id_].append(connection)
7194
else:
7295
self.connections[connection.id_] = [connection]
73-
self.update_connection_manifest(connection)
96+
self.update_connection_manifest(connection.id_)
7497

7598
def add_connection(self, id_, protocol_obj):
7699
buffer_mgr = self.config.components.buffer_manager
77100
conn = Connection(id_, protocol_obj, buffer_mgr, self)
78101
self.update_connections(conn)
79102
return conn
80103

81-
def remove_connection(self, conn):
104+
def remove_connection(self, protocol_obj):
82105
notify_protocols = []
83-
self.update_connection_manifest(conn)
106+
#self.update_connection_manifest(conn)
84107
for connection_node in self.connections:
85-
if conn in self.connections[connection_node]:
86-
logger.info("Removing connection {} for node {}".format(conn, connection_node))
87-
self.connections[connection_node].remove(conn)
108+
if protocol_obj in self.connections[connection_node]:
109+
logger.info("Removing connection {} for node {}".format(protocol_obj, connection_node))
110+
self.update_connection_manifest(connection_node)
111+
self.connections[connection_node].remove(protocol_obj)
88112
self.router.update_node(self.node_id, connection_node, 100)
89113
self.router.debug_router()
90114
notify_protocols += self.connections[connection_node]

0 commit comments

Comments
 (0)