Skip to content

Commit 3d3e2ba

Browse files
committed
Add envelop element for expiration, track connections persistently
1 parent 678e167 commit 3d3e2ba

3 files changed

Lines changed: 42 additions & 1 deletion

File tree

receptor/messages/envelope.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import uuid
6+
import time
67

78
logger = logging.getLogger(__name__)
89

@@ -37,7 +38,7 @@ def serialize(self):
3738
class InnerEnvelope:
3839
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
3940
raw_payload, directive=None, in_response_to=None, ttl=None,
40-
serial=1):
41+
serial=1, expire_time_delta=300):
4142
self.receptor = receptor
4243
self.message_id = message_id
4344
self.sender = sender
@@ -48,6 +49,9 @@ def __init__(self, receptor, message_id, sender, recipient, message_type, timest
4849
self.directive = directive # None if response, 'namespace:action' if not
4950
self.in_response_to = in_response_to # None if directive, a message_id if not
5051
self.ttl = ttl # Optional
52+
if not expire_time_delta:
53+
self.expire_time = None
54+
self.expire_time = time.time() + expire_time_delta
5155
self.serial = serial # serial index of responses
5256

5357
@classmethod

receptor/protocol.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def __init__(self, receptor, loop):
3434
self.receptor = receptor
3535
self.loop = loop
3636

37+
async def watch_message_expire(self):
38+
pass
39+
3740
async def watch_queue(self, node, transport):
3841
buffer_mgr = self.receptor.config.components.buffer_manager
3942
buffer_obj = buffer_mgr.get_buffer_for_node(node, self.receptor)

receptor/receptor.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
2+
import json
23
import uuid
4+
import time
35
import asyncio
46
import logging
57

@@ -20,6 +22,9 @@ def __init__(self, config=None, node_id=None, router_cls=None,
2022
self.work_manager = (work_manager_cls or WorkManager)(self)
2123
self.connections = dict()
2224
self.controller_connections = []
25+
self.connection_manifest_path = os.path.join(self.config.server.data_dir,
26+
self.node_id,
27+
"connection_manifest")
2328
self.stop = False
2429

2530
def _find_node_id(self):
@@ -32,12 +37,40 @@ def _find_node_id(self):
3237
ofs.write(f'\nRECEPTOR_NODE_ID={node_id}\n')
3338
return str(node_id)
3439

40+
def get_connection_manifest(self):
41+
if not os.path.exists(self.connection_manifest_path):
42+
return []
43+
try:
44+
fd = open(self.connection_manifest_path, "r")
45+
manifest = json.load(fd)
46+
return manifest
47+
except Exception as e:
48+
logger.warn("Failed to read connection manifest: {}".format(e))
49+
return []
50+
51+
def update_connection_manifest(self, connection):
52+
manifest = self.get_connection_manifest()
53+
found = False
54+
for node in manifest:
55+
if node["id"] == connection.id_:
56+
node["last"] = time.time()
57+
found = True
58+
break
59+
if not found:
60+
node.append(dict(id=connection.id_,
61+
last=time.time()))
62+
fd = open(self.connection_manifest_path, "w")
63+
json.dump(manifest, fd)
64+
fd.close()
65+
66+
3567
def update_connections(self, connection):
3668
self.router.register_edge(connection.id_, self.node_id, 1)
3769
if connection.id_ in self.connections:
3870
self.connections[connection.id_].append(connection)
3971
else:
4072
self.connections[connection.id_] = [connection]
73+
self.update_connection_manifest(connection)
4174

4275
def add_connection(self, id_, protocol_obj):
4376
buffer_mgr = self.config.components.buffer_manager
@@ -47,6 +80,7 @@ def add_connection(self, id_, protocol_obj):
4780

4881
def remove_connection(self, conn):
4982
notify_protocols = []
83+
self.update_connection_manifest(conn)
5084
for connection_node in self.connections:
5185
if conn in self.connections[connection_node]:
5286
logger.info("Removing connection {} for node {}".format(conn, connection_node))

0 commit comments

Comments
 (0)