Skip to content

Commit a785c34

Browse files
committed
fix the duplicate code issue in client.py
1 parent d51e9c4 commit a785c34

1 file changed

Lines changed: 60 additions & 154 deletions

File tree

src/dubbo/client.py

Lines changed: 60 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -84,28 +84,27 @@ def _initialize(self):
8484

8585
self._initialized = True
8686

87-
def unary(
87+
def _create_rpc_callable(
8888
self,
89+
rpc_type: str,
8990
interface: Optional[Callable] = None,
9091
method_name: Optional[str] = None,
9192
params_types: Optional[List[Type]] = None,
9293
return_type: Optional[Type] = None,
9394
codec: Optional[str] = None,
9495
request_serializer: Optional[SerializingFunction] = None,
9596
response_deserializer: Optional[DeserializingFunction] = None,
97+
default_method_name: str = "rpc_call",
9698
) -> RpcCallable:
9799
"""
98-
Create unary RPC call.
99-
100-
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
100+
Create RPC callable with the specified type.
101101
"""
102-
103102
# Validate
104103
if interface is None and method_name is None:
105104
raise ValueError("Either 'interface' or 'method_name' must be provided")
106105

107106
# Determine the actual method name to call
108-
actual_method_name = method_name or (interface.__name__ if interface else "unary")
107+
actual_method_name = method_name or (interface.__name__ if interface else default_method_name)
109108

110109
# Build method descriptor (automatic or manual)
111110
if interface:
@@ -140,18 +139,17 @@ def dummy(): pass
140139
)
141140

142141
# Create the proper MethodDescriptor for the RPC call
143-
# This should match the structure expected by your RpcCallableFactory
144142
rpc_method_descriptor = MethodDescriptor(
145143
method_name=actual_method_name,
146144
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
147145
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
148-
rpc_type=RpcTypes.UNARY.value,
146+
rpc_type=rpc_type,
149147
)
150148

151149
# Create and return the RpcCallable
152150
return self._callable(rpc_method_descriptor)
153151

154-
def client_stream(
152+
def unary(
155153
self,
156154
interface: Optional[Callable] = None,
157155
method_name: Optional[str] = None,
@@ -162,62 +160,49 @@ def client_stream(
162160
response_deserializer: Optional[DeserializingFunction] = None,
163161
) -> RpcCallable:
164162
"""
165-
Create client streaming RPC call.
163+
Create unary RPC call.
166164
167165
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
168166
"""
167+
return self._create_rpc_callable(
168+
rpc_type=RpcTypes.UNARY.value,
169+
interface=interface,
170+
method_name=method_name,
171+
params_types=params_types,
172+
return_type=return_type,
173+
codec=codec,
174+
request_serializer=request_serializer,
175+
response_deserializer=response_deserializer,
176+
default_method_name="unary",
177+
)
169178

170-
# Validate
171-
if interface is None and method_name is None:
172-
raise ValueError("Either 'interface' or 'method_name' must be provided")
173-
174-
# Determine the actual method name to call
175-
actual_method_name = method_name or (interface.__name__ if interface else "client_stream")
176-
177-
# Build method descriptor (automatic or manual)
178-
if interface:
179-
method_desc = DubboTransportService.create_method_descriptor(
180-
func=interface,
181-
method_name=actual_method_name,
182-
parameter_types=params_types,
183-
return_type=return_type,
184-
interface=interface,
185-
)
186-
else:
187-
# Manual mode fallback: use dummy function for descriptor creation
188-
def dummy(): pass
189-
190-
method_desc = DubboTransportService.create_method_descriptor(
191-
func=dummy,
192-
method_name=actual_method_name,
193-
parameter_types=params_types or [],
194-
return_type=return_type or Any,
195-
)
196-
197-
# Determine serializers if not provided
198-
if request_serializer and response_deserializer:
199-
final_request_serializer = request_serializer
200-
final_response_deserializer = response_deserializer
201-
else:
202-
# Use DubboTransportService to generate serialization functions
203-
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
204-
transport_type=codec or "json",
205-
parameter_types=[p.annotation for p in method_desc.parameters],
206-
return_type=method_desc.return_parameter.annotation,
207-
)
179+
def client_stream(
180+
self,
181+
interface: Optional[Callable] = None,
182+
method_name: Optional[str] = None,
183+
params_types: Optional[List[Type]] = None,
184+
return_type: Optional[Type] = None,
185+
codec: Optional[str] = None,
186+
request_serializer: Optional[SerializingFunction] = None,
187+
response_deserializer: Optional[DeserializingFunction] = None,
188+
) -> RpcCallable:
189+
"""
190+
Create client streaming RPC call.
208191
209-
# Create the proper MethodDescriptor for the RPC call
210-
# This should match the structure expected by your RpcCallableFactory
211-
rpc_method_descriptor = MethodDescriptor(
212-
method_name=actual_method_name,
213-
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
214-
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
192+
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
193+
"""
194+
return self._create_rpc_callable(
215195
rpc_type=RpcTypes.CLIENT_STREAM.value,
196+
interface=interface,
197+
method_name=method_name,
198+
params_types=params_types,
199+
return_type=return_type,
200+
codec=codec,
201+
request_serializer=request_serializer,
202+
response_deserializer=response_deserializer,
203+
default_method_name="client_stream",
216204
)
217205

218-
# Create and return the RpcCallable
219-
return self._callable(rpc_method_descriptor)
220-
221206
def server_stream(
222207
self,
223208
interface: Optional[Callable] = None,
@@ -233,58 +218,18 @@ def server_stream(
233218
234219
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
235220
"""
236-
237-
# Validate
238-
if interface is None and method_name is None:
239-
raise ValueError("Either 'interface' or 'method_name' must be provided")
240-
241-
# Determine the actual method name to call
242-
actual_method_name = method_name or (interface.__name__ if interface else "server_stream")
243-
244-
# Build method descriptor (automatic or manual)
245-
if interface:
246-
method_desc = DubboTransportService.create_method_descriptor(
247-
func=interface,
248-
method_name=actual_method_name,
249-
parameter_types=params_types,
250-
return_type=return_type,
251-
interface=interface,
252-
)
253-
else:
254-
# Manual mode fallback: use dummy function for descriptor creation
255-
def dummy(): pass
256-
257-
method_desc = DubboTransportService.create_method_descriptor(
258-
func=dummy,
259-
method_name=actual_method_name,
260-
parameter_types=params_types or [],
261-
return_type=return_type or Any,
262-
)
263-
264-
# Determine serializers if not provided
265-
if request_serializer and response_deserializer:
266-
final_request_serializer = request_serializer
267-
final_response_deserializer = response_deserializer
268-
else:
269-
# Use DubboTransportService to generate serialization functions
270-
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
271-
transport_type=codec or "json",
272-
parameter_types=[p.annotation for p in method_desc.parameters],
273-
return_type=method_desc.return_parameter.annotation,
274-
)
275-
276-
# Create the proper MethodDescriptor for the RPC call
277-
# This should match the structure expected by your RpcCallableFactory
278-
rpc_method_descriptor = MethodDescriptor(
279-
method_name=actual_method_name,
280-
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
281-
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
221+
return self._create_rpc_callable(
282222
rpc_type=RpcTypes.SERVER_STREAM.value,
223+
interface=interface,
224+
method_name=method_name,
225+
params_types=params_types,
226+
return_type=return_type,
227+
codec=codec,
228+
request_serializer=request_serializer,
229+
response_deserializer=response_deserializer,
230+
default_method_name="server_stream",
283231
)
284232

285-
# Create and return the RpcCallable
286-
return self._callable(rpc_method_descriptor)
287-
288233
def bi_stream(
289234
self,
290235
interface: Optional[Callable] = None,
@@ -300,57 +245,18 @@ def bi_stream(
300245
301246
Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
302247
"""
303-
304-
# Validate
305-
if interface is None and method_name is None:
306-
raise ValueError("Either 'interface' or 'method_name' must be provided")
307-
308-
# Determine the actual method name to call
309-
actual_method_name = method_name or (interface.__name__ if interface else "bi_stream")
310-
311-
# Build method descriptor (automatic or manual)
312-
if interface:
313-
method_desc = DubboTransportService.create_method_descriptor(
314-
func=interface,
315-
method_name=actual_method_name,
316-
parameter_types=params_types,
317-
return_type=return_type,
318-
interface=interface,
319-
)
320-
else:
321-
# Manual mode fallback: use dummy function for descriptor creation
322-
def dummy(): pass
323-
324-
method_desc = DubboTransportService.create_method_descriptor(
325-
func=dummy,
326-
method_name=actual_method_name,
327-
parameter_types=params_types or [],
328-
return_type=return_type or Any,
329-
)
330-
331-
# Determine serializers if not provided
332-
if request_serializer and response_deserializer:
333-
final_request_serializer = request_serializer
334-
final_response_deserializer = response_deserializer
335-
else:
336-
# Use DubboTransportService to generate serialization functions
337-
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(
338-
transport_type=codec or "json",
339-
parameter_types=[p.annotation for p in method_desc.parameters],
340-
return_type=method_desc.return_parameter.annotation,
341-
)
342-
343-
344-
rpc_method_descriptor = MethodDescriptor(
345-
method_name=actual_method_name,
346-
arg_serialization=(final_request_serializer, None),
347-
return_serialization=(None, final_response_deserializer),
248+
return self._create_rpc_callable(
348249
rpc_type=RpcTypes.BI_STREAM.value,
250+
interface=interface,
251+
method_name=method_name,
252+
params_types=params_types,
253+
return_type=return_type,
254+
codec=codec,
255+
request_serializer=request_serializer,
256+
response_deserializer=response_deserializer,
257+
default_method_name="bi_stream",
349258
)
350259

351-
# Create and return the RpcCallable
352-
return self._callable(rpc_method_descriptor)
353-
354260
def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
355261
"""
356262
Generate a proxy for the given method.

0 commit comments

Comments
 (0)