1212pool = ThreadPoolExecutor ()
1313
1414
15+ class ManifestEncoder (json .JSONEncoder ):
16+ def default (self , o ):
17+ if isinstance (o , datetime .datetime ):
18+ return {
19+ "_type" : "datetime.datetime" ,
20+ "value" : o .isoformat (),
21+ }
22+ return super ().default (o )
23+
24+
25+ class ManifestDecoder (json .JSONDecoder ):
26+ def __init__ (self , * args , ** kwargs ):
27+ super ().__init__ (object_hook = self .object_hook , * args , ** kwargs )
28+
29+ def object_hook (self , o ):
30+ type_ = o .get ("_type" )
31+ if type_ != "datetime.datetime" :
32+ return o
33+
34+ t = o ['_type' ]
35+ return datetime .datetime .fromisoformat (o ["value" ])
36+
37+
1538class DurableBuffer :
1639
1740 def __init__ (self , dir_ , key , loop ):
@@ -31,7 +54,7 @@ def __init__(self, dir_, key, loop):
3154 async def put (self , data ):
3255 item = {
3356 "ident" : str (uuid .uuid4 ()),
34- "expire_time" : datetime .datetime .now () + datetime .timedelta (minutes = 5 ),
57+ "expire_time" : datetime .datetime .utcnow () + datetime .timedelta (minutes = 5 ),
3558 }
3659 await self ._loop .run_in_executor (pool , self ._write_file , data , item )
3760 await self .q .put (item )
@@ -42,7 +65,7 @@ async def get(self, handle_only=False, delete=True):
4265 msg = await self .q .get ()
4366 await self ._save_manifest ()
4467 try :
45- return await self ._get_file (msg , handle_only = handle_only , delete = delete )
68+ return await self ._get_file (msg [ "ident" ] , handle_only = handle_only , delete = delete )
4669 except FileNotFoundError :
4770 pass
4871
@@ -52,12 +75,12 @@ async def _save_manifest(self):
5275
5376 def _write_manifest (self ):
5477 with open (self ._manifest_path , "w" ) as fp :
55- json .dump (list (self .q ._queue ), fp )
78+ json .dump (list (self .q ._queue ), fp , cls = ManifestEncoder )
5679
5780 def _read_manifest (self ):
5881 try :
5982 with open (self ._manifest_path , "r" ) as fp :
60- return json .load (fp )
83+ return json .load (fp , cls = ManifestDecoder )
6184 except FileNotFoundError :
6285 return []
6386 except json .decoder .JSONDecodeError :
@@ -97,7 +120,7 @@ async def expire(self):
97120 item = await self .q .get ()
98121 ident = item ["ident" ]
99122 expire_time = item ["expire_time" ]
100- if expire_time > datetime .datetime .now ():
123+ if expire_time > datetime .datetime .utcnow ():
101124 logger .info ("Expiring message %s" , ident )
102125 # TODO: Do something with expired message
103126 await self ._loop .run_in_executor (pool , os .remove , self ._path_for_ident (ident ))
0 commit comments