11import asyncio
22import base64
33import datetime
4+ import io
45import itertools
56import json
67import logging
@@ -63,7 +64,6 @@ async def get(self):
6364class Frame :
6465 START_MSG = 0
6566 PAYLOAD = 1
66- FINISH = 2
6767
6868 def __init__ (self , type_ , version , length , msg_id , id_ ):
6969 self .type = type_
@@ -73,22 +73,25 @@ def __init__(self, type_, version, length, msg_id, id_):
7373 self .id = id_
7474
7575 def serialize (self ):
76- high , low = ((self .msg_id >> 64 ) & MAX_INT64 , self .msg_id & MAX_INT64 )
77- return b'' .join ([
78- pack ("ccIi" , chr (self .type ).encode ("ascii" ), chr (self .version ).encode ("ascii" ), self .id , self .length ),
79- pack (">QQ" , high , low ),
80- ])
76+ return pack (">ccIIQQ" , bytes ([self .type ]), bytes ([self .version ]), self .id , self .length , * split_uuid (self .msg_id ))
8177
8278 @classmethod
8379 def deserialize (cls , buf ):
84- t , v , i , length = unpack ("ccIi" , buf [0 :12 ])
85- hi , lo = unpack (">QQ" , buf [12 :])
86- msg_id = (hi << 64 ) | lo
80+ t , v , i , length , hi , lo = unpack (">ccIIQQ" , buf )
81+ msg_id = join_uuid (hi , lo )
8782 return cls (ord (t ), ord (v ), length , msg_id , i )
8883
8984 @classmethod
9085 def from_data (cls , data ):
91- return cls .deserialize (data [:28 ]), data [28 :]
86+ return cls .deserialize (data [:26 ]), data [26 :]
87+
88+
89+ def split_uuid (data ):
90+ return ((data >> 64 ) & MAX_INT64 , data & MAX_INT64 )
91+
92+
93+ def join_uuid (hi , lo ):
94+ return (hi << 64 ) | lo
9295
9396
9497class Header :
@@ -107,21 +110,22 @@ def __eq__(self, other):
107110 return (self .sender , self .recipient , self .route_list ) == (other .sender , other .recipient , other .route_list )
108111
109112
110- def gen_chunks (buffer , header , msg_id = None , chunksize = 2 ** 8 ):
113+ def gen_chunks (data , header , msg_id = None , chunksize = 2 ** 8 ):
111114 if msg_id is None :
112115 msg_id = uuid .uuid4 ().int
113116 seq = itertools .count ()
114117 buf = bytearray (chunksize )
115118 bv = memoryview (buf )
116119 header = header .serialize ()
117120 yield Frame (Frame .START_MSG , 1 , len (header ), msg_id , next (seq )).serialize () + header
121+ yield Frame (Frame .PAYLOAD , 1 , len (data ), msg_id , next (seq )).serialize ()
122+ buffer = io .BytesIO (data )
118123 bytes_read = buffer .readinto (buf )
119124 while bytes_read :
120- f = Frame (Frame .PAYLOAD , 1 , bytes_read , msg_id , next (seq )).serialize ()
121125 if bytes_read == chunksize :
122- yield f + bv .tobytes ()
126+ yield bv .tobytes ()
123127 else :
124- yield f + bv [:bytes_read ].tobytes ()
128+ yield bv [:bytes_read ].tobytes ()
125129 bytes_read = buffer .readinto (buf )
126130
127131
0 commit comments