Skip to content

Commit d51e9c4

Browse files
committed
gsoc pull request
1 parent ae74a99 commit d51e9c4

8 files changed

Lines changed: 1193 additions & 147 deletions

File tree

src/dubbo/classes.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16+
1617
import abc
1718
import threading
18-
from typing import Any, Callable, Optional, Union
19-
19+
from typing import Any, Callable, Optional, Union,Type
20+
from abc import ABC, abstractmethod
21+
from pydantic import BaseModel
2022
from dubbo.types import DeserializingFunction, RpcType, RpcTypes, SerializingFunction
2123

2224
__all__ = [
@@ -244,3 +246,21 @@ class ReadWriteStream(ReadStream, WriteStream, abc.ABC):
244246
"""
245247

246248
pass
249+
250+
251+
class Codec(ABC):
252+
def __init__(self, model_type: Type[BaseModel] = None, **kwargs):
253+
self.model_type = model_type
254+
255+
@abstractmethod
256+
def encode(self, data: Any) -> bytes:
257+
pass
258+
259+
@abstractmethod
260+
def decode(self, data: bytes) -> Any:
261+
pass
262+
263+
class CodecHelper:
264+
@staticmethod
265+
def get_class():
266+
return Codec

src/dubbo/client.py

Lines changed: 240 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16+
1617
import threading
17-
from typing import Optional
18+
from typing import Optional, Callable, List, Type, Union, Any
1819

1920
from dubbo.bootstrap import Dubbo
2021
from dubbo.classes import MethodDescriptor
@@ -31,6 +32,7 @@
3132
SerializingFunction,
3233
)
3334
from dubbo.url import URL
35+
from dubbo.codec import DubboTransportService
3436

3537
__all__ = ["Client"]
3638

@@ -84,68 +86,274 @@ def _initialize(self):
8486

8587
def unary(
8688
self,
87-
method_name: str,
89+
interface: Optional[Callable] = None,
90+
method_name: Optional[str] = None,
91+
params_types: Optional[List[Type]] = None,
92+
return_type: Optional[Type] = None,
93+
codec: Optional[str] = None,
8894
request_serializer: Optional[SerializingFunction] = None,
8995
response_deserializer: Optional[DeserializingFunction] = None,
9096
) -> RpcCallable:
91-
return self._callable(
92-
MethodDescriptor(
93-
method_name=method_name,
94-
arg_serialization=(request_serializer, None),
95-
return_serialization=(None, response_deserializer),
96-
rpc_type=RpcTypes.UNARY.value,
97+
"""
98+
Create unary RPC call.
99+
100+
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
101+
"""
102+
103+
# Validate
104+
if interface is None and method_name is None:
105+
raise ValueError("Either 'interface' or 'method_name' must be provided")
106+
107+
# Determine the actual method name to call
108+
actual_method_name = method_name or (interface.__name__ if interface else "unary")
109+
110+
# Build method descriptor (automatic or manual)
111+
if interface:
112+
method_desc = DubboTransportService.create_method_descriptor(
113+
func=interface,
114+
method_name=actual_method_name,
115+
parameter_types=params_types,
116+
return_type=return_type,
117+
interface=interface,
97118
)
119+
else:
120+
# Manual mode fallback: use dummy function for descriptor creation
121+
def dummy(): pass
122+
123+
method_desc = DubboTransportService.create_method_descriptor(
124+
func=dummy,
125+
method_name=actual_method_name,
126+
parameter_types=params_types or [],
127+
return_type=return_type or Any,
128+
)
129+
130+
# Determine serializers if not provided
131+
if request_serializer and response_deserializer:
132+
final_request_serializer = request_serializer
133+
final_response_deserializer = response_deserializer
134+
else:
135+
# Use DubboTransportService to generate serialization functions
136+
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
137+
transport_type=codec or "json",
138+
parameter_types=[p.annotation for p in method_desc.parameters],
139+
return_type=method_desc.return_parameter.annotation,
140+
)
141+
142+
# Create the proper MethodDescriptor for the RPC call
143+
# This should match the structure expected by your RpcCallableFactory
144+
rpc_method_descriptor = MethodDescriptor(
145+
method_name=actual_method_name,
146+
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
147+
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
148+
rpc_type=RpcTypes.UNARY.value,
98149
)
99150

151+
# Create and return the RpcCallable
152+
return self._callable(rpc_method_descriptor)
153+
100154
def client_stream(
101155
self,
102-
method_name: str,
156+
interface: Optional[Callable] = None,
157+
method_name: Optional[str] = None,
158+
params_types: Optional[List[Type]] = None,
159+
return_type: Optional[Type] = None,
160+
codec: Optional[str] = None,
103161
request_serializer: Optional[SerializingFunction] = None,
104162
response_deserializer: Optional[DeserializingFunction] = None,
105163
) -> RpcCallable:
106-
return self._callable(
107-
MethodDescriptor(
108-
method_name=method_name,
109-
arg_serialization=(request_serializer, None),
110-
return_serialization=(None, response_deserializer),
111-
rpc_type=RpcTypes.CLIENT_STREAM.value,
164+
"""
165+
Create client streaming RPC call.
166+
167+
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
168+
"""
169+
170+
# Validate
171+
if interface is None and method_name is None:
172+
raise ValueError("Either 'interface' or 'method_name' must be provided")
173+
174+
# Determine the actual method name to call
175+
actual_method_name = method_name or (interface.__name__ if interface else "client_stream")
176+
177+
# Build method descriptor (automatic or manual)
178+
if interface:
179+
method_desc = DubboTransportService.create_method_descriptor(
180+
func=interface,
181+
method_name=actual_method_name,
182+
parameter_types=params_types,
183+
return_type=return_type,
184+
interface=interface,
112185
)
186+
else:
187+
# Manual mode fallback: use dummy function for descriptor creation
188+
def dummy(): pass
189+
190+
method_desc = DubboTransportService.create_method_descriptor(
191+
func=dummy,
192+
method_name=actual_method_name,
193+
parameter_types=params_types or [],
194+
return_type=return_type or Any,
195+
)
196+
197+
# Determine serializers if not provided
198+
if request_serializer and response_deserializer:
199+
final_request_serializer = request_serializer
200+
final_response_deserializer = response_deserializer
201+
else:
202+
# Use DubboTransportService to generate serialization functions
203+
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
204+
transport_type=codec or "json",
205+
parameter_types=[p.annotation for p in method_desc.parameters],
206+
return_type=method_desc.return_parameter.annotation,
207+
)
208+
209+
# Create the proper MethodDescriptor for the RPC call
210+
# This should match the structure expected by your RpcCallableFactory
211+
rpc_method_descriptor = MethodDescriptor(
212+
method_name=actual_method_name,
213+
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
214+
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
215+
rpc_type=RpcTypes.CLIENT_STREAM.value,
113216
)
114217

218+
# Create and return the RpcCallable
219+
return self._callable(rpc_method_descriptor)
220+
115221
def server_stream(
116222
self,
117-
method_name: str,
223+
interface: Optional[Callable] = None,
224+
method_name: Optional[str] = None,
225+
params_types: Optional[List[Type]] = None,
226+
return_type: Optional[Type] = None,
227+
codec: Optional[str] = None,
118228
request_serializer: Optional[SerializingFunction] = None,
119229
response_deserializer: Optional[DeserializingFunction] = None,
120230
) -> RpcCallable:
121-
return self._callable(
122-
MethodDescriptor(
123-
method_name=method_name,
124-
arg_serialization=(request_serializer, None),
125-
return_serialization=(None, response_deserializer),
126-
rpc_type=RpcTypes.SERVER_STREAM.value,
231+
"""
232+
Create server streaming RPC call.
233+
234+
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
235+
"""
236+
237+
# Validate
238+
if interface is None and method_name is None:
239+
raise ValueError("Either 'interface' or 'method_name' must be provided")
240+
241+
# Determine the actual method name to call
242+
actual_method_name = method_name or (interface.__name__ if interface else "server_stream")
243+
244+
# Build method descriptor (automatic or manual)
245+
if interface:
246+
method_desc = DubboTransportService.create_method_descriptor(
247+
func=interface,
248+
method_name=actual_method_name,
249+
parameter_types=params_types,
250+
return_type=return_type,
251+
interface=interface,
252+
)
253+
else:
254+
# Manual mode fallback: use dummy function for descriptor creation
255+
def dummy(): pass
256+
257+
method_desc = DubboTransportService.create_method_descriptor(
258+
func=dummy,
259+
method_name=actual_method_name,
260+
parameter_types=params_types or [],
261+
return_type=return_type or Any,
262+
)
263+
264+
# Determine serializers if not provided
265+
if request_serializer and response_deserializer:
266+
final_request_serializer = request_serializer
267+
final_response_deserializer = response_deserializer
268+
else:
269+
# Use DubboTransportService to generate serialization functions
270+
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
271+
transport_type=codec or "json",
272+
parameter_types=[p.annotation for p in method_desc.parameters],
273+
return_type=method_desc.return_parameter.annotation,
127274
)
275+
276+
# Create the proper MethodDescriptor for the RPC call
277+
# This should match the structure expected by your RpcCallableFactory
278+
rpc_method_descriptor = MethodDescriptor(
279+
method_name=actual_method_name,
280+
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
281+
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
282+
rpc_type=RpcTypes.SERVER_STREAM.value,
128283
)
129284

285+
# Create and return the RpcCallable
286+
return self._callable(rpc_method_descriptor)
287+
130288
def bi_stream(
131289
self,
132-
method_name: str,
290+
interface: Optional[Callable] = None,
291+
method_name: Optional[str] = None,
292+
params_types: Optional[List[Type]] = None,
293+
return_type: Optional[Type] = None,
294+
codec: Optional[str] = None,
133295
request_serializer: Optional[SerializingFunction] = None,
134296
response_deserializer: Optional[DeserializingFunction] = None,
135297
) -> RpcCallable:
136-
# create method descriptor
137-
return self._callable(
138-
MethodDescriptor(
139-
method_name=method_name,
140-
arg_serialization=(request_serializer, None),
141-
return_serialization=(None, response_deserializer),
142-
rpc_type=RpcTypes.BI_STREAM.value,
298+
"""
299+
Create bidirectional streaming RPC call.
300+
301+
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
302+
"""
303+
304+
# Validate
305+
if interface is None and method_name is None:
306+
raise ValueError("Either 'interface' or 'method_name' must be provided")
307+
308+
# Determine the actual method name to call
309+
actual_method_name = method_name or (interface.__name__ if interface else "bi_stream")
310+
311+
# Build method descriptor (automatic or manual)
312+
if interface:
313+
method_desc = DubboTransportService.create_method_descriptor(
314+
func=interface,
315+
method_name=actual_method_name,
316+
parameter_types=params_types,
317+
return_type=return_type,
318+
interface=interface,
319+
)
320+
else:
321+
# Manual mode fallback: use dummy function for descriptor creation
322+
def dummy(): pass
323+
324+
method_desc = DubboTransportService.create_method_descriptor(
325+
func=dummy,
326+
method_name=actual_method_name,
327+
parameter_types=params_types or [],
328+
return_type=return_type or Any,
143329
)
330+
331+
# Determine serializers if not provided
332+
if request_serializer and response_deserializer:
333+
final_request_serializer = request_serializer
334+
final_response_deserializer = response_deserializer
335+
else:
336+
# Use DubboTransportService to generate serialization functions
337+
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
338+
transport_type=codec or "json",
339+
parameter_types=[p.annotation for p in method_desc.parameters],
340+
return_type=method_desc.return_parameter.annotation,
341+
)
342+
343+
344+
rpc_method_descriptor = MethodDescriptor(
345+
method_name=actual_method_name,
346+
arg_serialization=(final_request_serializer, None),
347+
return_serialization=(None, final_response_deserializer),
348+
rpc_type=RpcTypes.BI_STREAM.value,
144349
)
145350

351+
# Create and return the RpcCallable
352+
return self._callable(rpc_method_descriptor)
353+
146354
def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
147355
"""
148-
Generate a proxy for the given method
356+
Generate a proxy for the given method.
149357
:param method_descriptor: The method descriptor.
150358
:return: The proxy.
151359
:rtype: RpcCallable
@@ -160,4 +368,4 @@ def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
160368
url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor
161369

162370
# create proxy
163-
return self._callable_factory.get_callable(self._invoker, url)
371+
return self._callable_factory.get_callable(self._invoker, url)

src/dubbo/codec/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .dubbo_codec import DubboTransportService
18+
19+
__all__ = ['DubboTransportService']

0 commit comments

Comments
 (0)