-
Notifications
You must be signed in to change notification settings - Fork 102
gsoc containing json serialization handler and dubbo codec #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 14 commits
d51e9c4
a785c34
95321d7
109a1e4
57b0129
9cd492e
6e13e9f
85d0bdd
b94f3cf
46e660c
1ff7083
dfa1b07
ed8c30b
da70485
fc6e5dd
598703c
65de7d6
fbb9d81
4f914b5
e12f69e
3424ccb
1393855
23f21d9
e3a5244
1b5d668
c2f1424
25a0496
bd2b42b
a5fd49e
5a4b4eb
fc96853
1dc61a0
bfee72d
96b02ed
91a4ba8
ebbb532
552137c
dea7c78
db9f70a
e016672
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,13 +8,15 @@ | |
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an "AS IS" BASIS, | ||
|
aditya0yadav marked this conversation as resolved.
Outdated
|
||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import threading | ||
| from typing import Optional | ||
| import inspect | ||
| from typing import Optional, Callable, List, Type, Any, get_type_hints | ||
|
|
||
| from dubbo.bootstrap import Dubbo | ||
| from dubbo.classes import MethodDescriptor | ||
|
|
@@ -31,6 +33,7 @@ | |
| SerializingFunction, | ||
| ) | ||
| from dubbo.url import URL | ||
| from dubbo.codec import DubboTransportService | ||
|
|
||
| __all__ = ["Client"] | ||
|
|
||
|
|
@@ -61,11 +64,17 @@ def _initialize(self): | |
| return | ||
|
|
||
| # get the protocol | ||
| protocol = extensionLoader.get_extension(Protocol, self._reference.protocol)() | ||
| protocol = extensionLoader.get_extension( | ||
| Protocol, self._reference.protocol | ||
| )() | ||
|
|
||
| registry_config = self._dubbo.registry_config | ||
|
|
||
| self._protocol = RegistryProtocol(registry_config, protocol) if self._dubbo.registry_config else protocol | ||
| self._protocol = ( | ||
| RegistryProtocol(registry_config, protocol) | ||
| if registry_config | ||
| else protocol | ||
| ) | ||
|
|
||
| # build url | ||
| reference_url = self._reference.to_url() | ||
|
|
@@ -82,82 +91,116 @@ def _initialize(self): | |
|
|
||
| self._initialized = True | ||
|
|
||
| def unary( | ||
| @classmethod | ||
|
aditya0yadav marked this conversation as resolved.
Outdated
|
||
| def _infer_types_from_interface(cls, interface: Callable) -> tuple: | ||
| """ | ||
| Infer method name, parameter types, and return type from a callable. | ||
| """ | ||
| try: | ||
| type_hints = get_type_hints(interface) | ||
| sig = inspect.signature(interface) | ||
| method_name = interface.__name__ | ||
| params = list(sig.parameters.values()) | ||
|
|
||
| # skip 'self' for bound methods | ||
| if params and params[0].name == "self": | ||
| params = params[1:] | ||
|
|
||
| param_types = [type_hints.get(p.name, Any) for p in params] | ||
| return_type = type_hints.get("return", Any) | ||
|
|
||
| return method_name, param_types, return_type | ||
| except Exception: | ||
| return interface.__name__, [Any], Any | ||
|
|
||
| def _create_rpc_callable( | ||
| self, | ||
| method_name: str, | ||
| rpc_type: str, | ||
| interface: Optional[Callable] = None, | ||
| method_name: Optional[str] = None, | ||
| params_types: Optional[List[Type]] = None, | ||
| return_type: Optional[Type] = None, | ||
| codec: Optional[str] = None, | ||
| request_serializer: Optional[SerializingFunction] = None, | ||
| response_deserializer: Optional[DeserializingFunction] = None, | ||
| default_method_name: str = "rpc_call", | ||
|
aditya0yadav marked this conversation as resolved.
Outdated
|
||
| ) -> RpcCallable: | ||
| return self._callable( | ||
| MethodDescriptor( | ||
| method_name=method_name, | ||
| arg_serialization=(request_serializer, None), | ||
| return_serialization=(None, response_deserializer), | ||
| rpc_type=RpcTypes.UNARY.value, | ||
| """ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also hope to see the implementation on the server side
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cnzakii like most of the work of server can be handle by the rpc handler or
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can review the proposal I sent in Slack, which also contains the corresponding Server interface design. You should also implement the construction method descriptors on the Server side, select the serialization method, and so on.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok |
||
| Create RPC callable with the specified type. | ||
| """ | ||
| if interface is None and method_name is None: | ||
| raise ValueError("Either 'interface' or 'method_name' must be provided") | ||
|
|
||
| # Start with explicit values | ||
| m_name = method_name | ||
| p_types = params_types | ||
| r_type = return_type | ||
|
|
||
| # Infer from interface if needed | ||
| if interface: | ||
| if p_types is None or r_type is None or m_name is None: | ||
| inf_name, inf_params, inf_return = self._infer_types_from_interface( | ||
| interface | ||
| ) | ||
| m_name = m_name or inf_name | ||
| p_types = p_types or inf_params | ||
| r_type = r_type or inf_return | ||
|
|
||
| # Fallback to default | ||
| m_name = m_name or default_method_name | ||
|
|
||
| # Determine serializers | ||
| if request_serializer and response_deserializer: | ||
| req_ser = request_serializer | ||
| res_deser = response_deserializer | ||
| else: | ||
| req_ser, res_deser = DubboTransportService.create_serialization_functions( | ||
| codec or "json", # fallback to json | ||
|
aditya0yadav marked this conversation as resolved.
Outdated
|
||
| parameter_types=p_types, | ||
| return_type=r_type, | ||
| ) | ||
|
|
||
| # Create MethodDescriptor | ||
| descriptor = MethodDescriptor( | ||
| method_name=m_name, | ||
| arg_serialization=(req_ser, None), | ||
| return_serialization=(None, res_deser), | ||
| rpc_type=rpc_type, | ||
| ) | ||
|
|
||
| def client_stream( | ||
| self, | ||
| method_name: str, | ||
| request_serializer: Optional[SerializingFunction] = None, | ||
| response_deserializer: Optional[DeserializingFunction] = None, | ||
| ) -> RpcCallable: | ||
| return self._callable( | ||
| MethodDescriptor( | ||
| method_name=method_name, | ||
| arg_serialization=(request_serializer, None), | ||
| return_serialization=(None, response_deserializer), | ||
| rpc_type=RpcTypes.CLIENT_STREAM.value, | ||
| ) | ||
| return self._callable(descriptor) | ||
|
|
||
| def unary(self, **kwargs) -> RpcCallable: | ||
| return self._create_rpc_callable( | ||
| rpc_type=RpcTypes.UNARY.value, default_method_name="unary", **kwargs | ||
| ) | ||
|
|
||
| def server_stream( | ||
| self, | ||
| method_name: str, | ||
| request_serializer: Optional[SerializingFunction] = None, | ||
| response_deserializer: Optional[DeserializingFunction] = None, | ||
| ) -> RpcCallable: | ||
| return self._callable( | ||
| MethodDescriptor( | ||
| method_name=method_name, | ||
| arg_serialization=(request_serializer, None), | ||
| return_serialization=(None, response_deserializer), | ||
| rpc_type=RpcTypes.SERVER_STREAM.value, | ||
| ) | ||
| def client_stream(self, **kwargs) -> RpcCallable: | ||
| return self._create_rpc_callable( | ||
| rpc_type=RpcTypes.CLIENT_STREAM.value, | ||
| default_method_name="client_stream", | ||
| **kwargs, | ||
| ) | ||
|
|
||
| def bi_stream( | ||
| self, | ||
| method_name: str, | ||
| request_serializer: Optional[SerializingFunction] = None, | ||
| response_deserializer: Optional[DeserializingFunction] = None, | ||
| ) -> RpcCallable: | ||
| # create method descriptor | ||
| return self._callable( | ||
| MethodDescriptor( | ||
| method_name=method_name, | ||
| arg_serialization=(request_serializer, None), | ||
| return_serialization=(None, response_deserializer), | ||
| rpc_type=RpcTypes.BI_STREAM.value, | ||
| ) | ||
| def server_stream(self, **kwargs) -> RpcCallable: | ||
| return self._create_rpc_callable( | ||
| rpc_type=RpcTypes.SERVER_STREAM.value, | ||
| default_method_name="server_stream", | ||
| **kwargs, | ||
| ) | ||
|
|
||
| def bi_stream(self, **kwargs) -> RpcCallable: | ||
| return self._create_rpc_callable( | ||
| rpc_type=RpcTypes.BI_STREAM.value, default_method_name="bi_stream", **kwargs | ||
| ) | ||
|
|
||
| def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable: | ||
| """ | ||
| Generate a proxy for the given method | ||
| :param method_descriptor: The method descriptor. | ||
| :return: The proxy. | ||
| :rtype: RpcCallable | ||
| Generate a proxy for the given method. | ||
| """ | ||
| # get invoker | ||
| url = self._invoker.get_url() | ||
|
|
||
| # clone url | ||
| url = url.copy() | ||
| url.parameters[common_constants.METHOD_KEY] = method_descriptor.get_method_name() | ||
| # set method descriptor | ||
| url = self._invoker.get_url().copy() | ||
| url.parameters[common_constants.METHOD_KEY] = ( | ||
| method_descriptor.get_method_name() | ||
| ) | ||
| url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor | ||
|
|
||
| # create proxy | ||
| return self._callable_factory.get_callable(self._invoker, url) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from .dubbo_codec import DubboTransportService | ||
|
|
||
| __all__ = ['DubboTransportService'] |
Uh oh!
There was an error while loading. Please reload this page.