Skip to content

Commit 57b0129

Browse files
committed
completed the protbuf implementation
1 parent 109a1e4 commit 57b0129

5 files changed

Lines changed: 420 additions & 8 deletions

File tree

src/dubbo/client.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,8 @@ def dummy(): pass
132132
final_response_deserializer = response_deserializer
133133
else:
134134
# Use DubboTransportService to generate serialization functions
135-
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
136-
transport_type=codec or "json",
137-
parameter_types=[p.annotation for p in method_desc.parameters],
138-
return_type=method_desc.return_parameter.annotation,
139-
)
135+
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(codec, parameter_types=params_types, return_type=return_type)
136+
print("final",codec, final_request_serializer, final_response_deserializer)
140137

141138
# Create the proper MethodDescriptor for the RPC call
142139
rpc_method_descriptor = MethodDescriptor(

src/dubbo/codec/dubbo_codec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def create_encoder_decoder_pair(transport_type: str, parameter_types: List[Type]
8282
return_type=return_type,
8383
**codec_options
8484
)
85+
print("codec_instance", codec_instance.get_encoder(), codec_instance.get_decoder())
8586

8687
return codec_instance.get_encoder(), codec_instance.get_decoder()
8788

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .protobuf_codec_handler import ProtobufTransportCodec, ProtobufTransportEncoder, ProtobufTransportDecoder
18+
19+
__all__ = [
20+
"ProtobufTransportCodec", "ProtobufTransportEncoder", "ProtobufTransportDecoder"
21+
]
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from typing import Any, Type, Protocol, Optional
18+
from abc import ABC, abstractmethod
19+
import json
20+
from dataclasses import dataclass
21+
22+
# Betterproto imports
23+
try:
24+
import betterproto
25+
HAS_BETTERPROTO = True
26+
except ImportError:
27+
HAS_BETTERPROTO = False
28+
29+
try:
30+
from pydantic import BaseModel
31+
HAS_PYDANTIC = True
32+
except ImportError:
33+
HAS_PYDANTIC = False
34+
35+
# Reuse your existing JSON type system
36+
from dubbo.codec.json_codec.json_type import (
37+
TypeProviderFactory, SerializationState,
38+
SerializationException, DeserializationException
39+
)
40+
41+
42+
class ProtobufEncodingFunction(Protocol):
43+
def __call__(self, obj: Any) -> bytes: ...
44+
45+
46+
class ProtobufDecodingFunction(Protocol):
47+
def __call__(self, data: bytes) -> Any: ...
48+
49+
50+
@dataclass
51+
class ProtobufMethodDescriptor:
52+
"""Protobuf-specific method descriptor for single parameter"""
53+
parameter_type: Type
54+
return_type: Type
55+
protobuf_message_type: Optional[Type] = None
56+
use_json_fallback: bool = False
57+
58+
59+
class ProtobufTypeHandler:
60+
"""Handles type conversion between Python types and Betterproto"""
61+
62+
@staticmethod
63+
def is_betterproto_message(obj_type: Type) -> bool:
64+
"""Check if type is a betterproto message class"""
65+
if not HAS_BETTERPROTO:
66+
return False
67+
try:
68+
return (hasattr(obj_type, '__dataclass_fields__') and
69+
issubclass(obj_type, betterproto.Message))
70+
except (TypeError, AttributeError):
71+
return False
72+
73+
@staticmethod
74+
def is_betterproto_message_instance(obj: Any) -> bool:
75+
"""Check if object is a betterproto message instance"""
76+
if not HAS_BETTERPROTO:
77+
return False
78+
try:
79+
return isinstance(obj, betterproto.Message)
80+
except:
81+
return False
82+
83+
@staticmethod
84+
def is_protobuf_compatible(obj_type: Type) -> bool:
85+
"""Check if type can be handled by protobuf"""
86+
return (obj_type in (str, int, float, bool, bytes) or
87+
ProtobufTypeHandler.is_betterproto_message(obj_type))
88+
89+
@staticmethod
90+
def needs_json_fallback(parameter_type: Type) -> bool:
91+
"""Check if we need JSON fallback for this type"""
92+
return not ProtobufTypeHandler.is_protobuf_compatible(parameter_type)
93+
94+
95+
class ProtobufTransportEncoder:
96+
"""Protobuf encoder for single parameters using betterproto"""
97+
98+
def __init__(self, parameter_type: Type = None, **kwargs):
99+
if not HAS_BETTERPROTO:
100+
raise ImportError("betterproto library is required for ProtobufTransportEncoder")
101+
102+
self.parameter_type = parameter_type
103+
104+
self.descriptor = ProtobufMethodDescriptor(
105+
parameter_type=parameter_type,
106+
return_type=None,
107+
use_json_fallback=ProtobufTypeHandler.needs_json_fallback(parameter_type) if parameter_type else False
108+
)
109+
110+
if self.descriptor.use_json_fallback:
111+
from dubbo.codec.json_codec.json_codec_handler import JsonTransportEncoder
112+
self.json_fallback_encoder = JsonTransportEncoder([parameter_type], **kwargs)
113+
114+
def encode(self, parameter: Any) -> bytes:
115+
"""Encode single parameter to bytes"""
116+
try:
117+
if parameter is None:
118+
return b''
119+
120+
# Handle case where parameter is a tuple (common in RPC calls)
121+
if isinstance(parameter, tuple):
122+
if len(parameter) == 0:
123+
return b''
124+
elif len(parameter) == 1:
125+
return self._encode_single_parameter(parameter[0])
126+
else:
127+
raise SerializationException(f"Multiple parameters not supported. Got tuple with {len(parameter)} elements, expected 1.")
128+
129+
return self._encode_single_parameter(parameter)
130+
131+
except Exception as e:
132+
raise SerializationException(f"Protobuf encoding failed: {e}") from e
133+
134+
def _encode_single_parameter(self, parameter: Any) -> bytes:
135+
"""Encode a single parameter using betterproto"""
136+
# If it's already a betterproto message instance, serialize it
137+
if ProtobufTypeHandler.is_betterproto_message_instance(parameter):
138+
return bytes(parameter)
139+
140+
# If we have type info and it's a betterproto message type
141+
if self.parameter_type and ProtobufTypeHandler.is_betterproto_message(self.parameter_type):
142+
if isinstance(parameter, self.parameter_type):
143+
return bytes(parameter)
144+
elif isinstance(parameter, dict):
145+
# Convert dict to betterproto message
146+
try:
147+
message = self.parameter_type().from_dict(parameter)
148+
return bytes(message)
149+
except Exception as e:
150+
raise SerializationException(f"Cannot convert dict to {self.parameter_type}: {e}")
151+
else:
152+
raise SerializationException(f"Cannot convert {type(parameter)} to {self.parameter_type}")
153+
154+
# Handle primitive types by wrapping in a simple message
155+
if isinstance(parameter, (str, int, float, bool, bytes)):
156+
return self._encode_primitive(parameter)
157+
158+
# Use JSON fallback if configured
159+
if self.descriptor.use_json_fallback:
160+
json_data = self.json_fallback_encoder.encode((parameter,))
161+
return json_data
162+
163+
raise SerializationException(f"Cannot encode {type(parameter)} as protobuf")
164+
165+
def _encode_primitive(self, value: Any) -> bytes:
166+
"""Encode primitive values by wrapping them in a simple structure"""
167+
# For primitives, we'll use JSON encoding wrapped in bytes
168+
# This is a simplified approach - in a real implementation you might
169+
# want to define a wrapper protobuf message for primitives
170+
try:
171+
json_str = json.dumps({"value": value, "type": type(value).__name__})
172+
return json_str.encode('utf-8')
173+
except Exception as e:
174+
raise SerializationException(f"Failed to encode primitive {value}: {e}")
175+
176+
177+
class ProtobufTransportDecoder:
178+
"""Protobuf decoder for single parameters using betterproto"""
179+
180+
def __init__(self, target_type: Type = None, **kwargs):
181+
if not HAS_BETTERPROTO:
182+
raise ImportError("betterproto library is required for ProtobufTransportDecoder")
183+
184+
self.target_type = target_type
185+
self.use_json_fallback = ProtobufTypeHandler.needs_json_fallback(target_type) if target_type else False
186+
187+
if self.use_json_fallback:
188+
from dubbo.codec.json_codec.json_codec_handler import JsonTransportDecoder
189+
self.json_fallback_decoder = JsonTransportDecoder(target_type, **kwargs)
190+
191+
def decode(self, data: bytes) -> Any:
192+
"""Decode bytes to single parameter"""
193+
try:
194+
if not data:
195+
return None
196+
197+
if not self.target_type:
198+
return self._decode_without_type_info(data)
199+
200+
return self._decode_single_parameter(data, self.target_type)
201+
202+
except Exception as e:
203+
raise DeserializationException(f"Protobuf decoding failed: {e}") from e
204+
205+
def _decode_single_parameter(self, data: bytes, target_type: Type) -> Any:
206+
"""Decode single parameter using betterproto"""
207+
if ProtobufTypeHandler.is_betterproto_message(target_type):
208+
try:
209+
# Use betterproto's parsing
210+
message_instance = target_type().parse(data)
211+
return message_instance
212+
except Exception as e:
213+
if self.use_json_fallback:
214+
return self.json_fallback_decoder.decode(data)
215+
raise DeserializationException(f"Failed to parse betterproto message: {e}")
216+
217+
# Handle primitives
218+
elif target_type in (str, int, float, bool, bytes):
219+
return self._decode_primitive(data, target_type)
220+
221+
# Use JSON fallback
222+
elif self.use_json_fallback:
223+
return self.json_fallback_decoder.decode(data)
224+
225+
else:
226+
raise DeserializationException(f"Cannot decode to {target_type} from protobuf")
227+
228+
def _decode_primitive(self, data: bytes, target_type: Type) -> Any:
229+
"""Decode primitive values from their wrapped format"""
230+
try:
231+
json_str = data.decode('utf-8')
232+
parsed = json.loads(json_str)
233+
value = parsed.get("value")
234+
235+
# Convert to target type if needed
236+
if target_type == str:
237+
return str(value)
238+
elif target_type == int:
239+
return int(value)
240+
elif target_type == float:
241+
return float(value)
242+
elif target_type == bool:
243+
return bool(value)
244+
elif target_type == bytes:
245+
return bytes(value) if isinstance(value, (list, bytes)) else str(value).encode()
246+
else:
247+
return value
248+
249+
except Exception as e:
250+
raise DeserializationException(f"Failed to decode primitive: {e}")
251+
252+
def _decode_without_type_info(self, data: bytes) -> Any:
253+
"""Decode without type information - try JSON first"""
254+
try:
255+
return json.loads(data.decode('utf-8'))
256+
except:
257+
return data
258+
259+
260+
class ProtobufTransportCodec:
261+
"""Main protobuf codec class for single parameters using betterproto"""
262+
263+
def __init__(self, parameter_type: Type = None, return_type: Type = None, **kwargs):
264+
if not HAS_BETTERPROTO:
265+
raise ImportError("betterproto library is required for ProtobufTransportCodec")
266+
267+
self.parameter_type = parameter_type
268+
self.return_type = return_type
269+
270+
self._encoder = ProtobufTransportEncoder(
271+
parameter_type=parameter_type,
272+
**kwargs
273+
)
274+
self._decoder = ProtobufTransportDecoder(
275+
target_type=return_type,
276+
**kwargs
277+
)
278+
279+
def encode_parameter(self, argument: Any) -> bytes:
280+
"""Encode single parameter"""
281+
return self._encoder.encode(argument)
282+
283+
def encode_parameters(self, arguments: tuple) -> bytes:
284+
"""Legacy method to handle tuple of arguments (for backward compatibility)"""
285+
if not arguments:
286+
return b''
287+
if len(arguments) == 1:
288+
return self._encoder.encode(arguments[0])
289+
else:
290+
raise SerializationException(f"Multiple parameters not supported. Got {len(arguments)} arguments, expected 1.")
291+
292+
def decode_return_value(self, data: bytes) -> Any:
293+
"""Decode return value"""
294+
return self._decoder.decode(data)
295+
296+
def get_encoder(self) -> ProtobufTransportEncoder:
297+
return self._encoder
298+
299+
def get_decoder(self) -> ProtobufTransportDecoder:
300+
return self._decoder
301+
302+
303+
def create_protobuf_codec(**kwargs) -> ProtobufTransportCodec:
304+
"""Factory function to create protobuf codec"""
305+
return ProtobufTransportCodec(**kwargs)

0 commit comments

Comments
 (0)