@@ -36,21 +36,22 @@ async def put(self, data):
3636
3737 async def handle_frame (self , data ):
3838 self .current_frame , rest = Frame .from_data (data )
39- if self .current_frame .type in (Frame .START_MSG , Frame .PAYLOAD ):
40- self .to_read = self .current_frame .length
41- await self .consume (rest )
42- else :
39+
40+ if self .current_frame .type not in (Frame .HEADER , Frame .PAYLOAD ):
4341 raise Exception ("Unknown Frame Type" )
4442
43+ self .to_read = self .current_frame .length
44+ await self .consume (rest )
45+
4546 async def consume (self , data ):
4647 self .to_read -= len (data )
4748 self .bb += data
4849 if self .to_read == 0 :
4950 await self .finish ()
5051
5152 async def finish (self ):
52- if self .current_frame .type == Frame .START_MSG :
53- self .header = Header ( ** json . loads (self .bb ) )
53+ if self .current_frame .type == Frame .HEADER :
54+ self .header = Header . from_bytes (self .bb )
5455 elif self .current_frame .type == Frame .PAYLOAD :
5556 await self .q .put ((self .header , self .bb ))
5657 self .header = None
@@ -62,9 +63,11 @@ async def get(self):
6263
6364
6465class Frame :
65- START_MSG = 0
66+ HEADER = 0
6667 PAYLOAD = 1
6768
69+ __slots__ = ('type' , 'version' , 'length' , 'msg_id' , 'id' )
70+
6871 def __init__ (self , type_ , version , length , msg_id , id_ ):
6972 self .type = type_
7073 self .version = version
@@ -103,8 +106,12 @@ def __init__(self, sender, recipient, route_list):
103106 def serialize (self ):
104107 return json .dumps ({"sender" : self .sender , "recipient" : self .recipient , "route_list" : self .route_list }).encode ("utf-8" )
105108
109+ @classmethod
110+ def from_bytes (cls , data ):
111+ return cls (** json .loads (data ))
112+
106113 def __repr__ (self ):
107- return f"Header: { self .sender } , { self .recipient } , { self .route_list } "
114+ return f"Header( { self .sender } , { self .recipient } , { self .route_list } ) "
108115
109116 def __eq__ (self , other ):
110117 return (self .sender , self .recipient , self .route_list ) == (other .sender , other .recipient , other .route_list )
@@ -117,7 +124,7 @@ def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8):
117124 buf = bytearray (chunksize )
118125 bv = memoryview (buf )
119126 header = header .serialize ()
120- yield Frame (Frame .START_MSG , 1 , len (header ), msg_id , next (seq )).serialize () + header
127+ yield Frame (Frame .HEADER , 1 , len (header ), msg_id , next (seq )).serialize () + header
121128 yield Frame (Frame .PAYLOAD , 1 , len (data ), msg_id , next (seq )).serialize ()
122129 buffer = io .BytesIO (data )
123130 bytes_read = buffer .readinto (buf )
0 commit comments