Skip to content

Commit 74aee76

Browse files
jerryyummyyouyun.0601wqliang
authored
[ISSUE #5208] develop mcp protocol (#5203)
* basic arch * refine * basic arch * build the basic mcp server without streamable http * build the basic mcp server without streamable http * build the basic mcp server without streamable http * build the basic mcp server with streamable http * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * build the basic mcp server * Update RemoteSubscribeInstance.java * Update McpSinkHandlerRetryWrapper.java * Update CommonMcpSinkHandler.java * Update McpSinkConnector.java * Update McpSinkHandler.java * Update McpExportRecord.java * Update McpConnectRecord.java * Update McpExportRecordPage.java * Update McpExportMetadata.java * Update McpSourceConnector.java * Update McpToolRegistry.java * Update McpServerConfig.java * Update Protocol.java * Update McpSourceConnector.java * Update McpSourceConstants.java * Update McpStandardProtocol.java * Update McpRequest.java * Update McpResponse.java * Update McpSinkHandlerRetryWrapper.java * Update AbstractMcpSinkHandler.java --------- Co-authored-by: youyun.0601 <youyun.0601@bytedance.com> Co-authored-by: wqliang <wqliang@users.noreply.github.com>
1 parent 436ce78 commit 74aee76

38 files changed

Lines changed: 3719 additions & 3 deletions
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.mcp;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class McpRetryConfig {
24+
// maximum number of retries, default 2, minimum 0
25+
private int maxRetries = 2;
26+
27+
// retry interval, default 1000ms
28+
private int interval = 1000;
29+
30+
// Default value is false, indicating that only requests with network-level errors will be retried.
31+
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
32+
private boolean retryOnNonSuccess = false;
33+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.mcp;
19+
20+
import org.apache.eventmesh.common.config.connector.SinkConfig;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
26+
@Data
27+
@EqualsAndHashCode(callSuper = true)
28+
public class McpSinkConfig extends SinkConfig {
29+
30+
public SinkConnectorConfig connectorConfig;
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.mcp;
19+
20+
import org.apache.eventmesh.common.config.connector.SourceConfig;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
26+
@Data
27+
@EqualsAndHashCode(callSuper = true)
28+
public class McpSourceConfig extends SourceConfig {
29+
30+
public SourceConnectorConfig connectorConfig;
31+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.mcp;
19+
20+
21+
import lombok.Data;
22+
23+
@Data
24+
public class SinkConnectorConfig {
25+
26+
private String connectorName;
27+
28+
private String[] urls;
29+
30+
// keepAlive, default true
31+
private boolean keepAlive = true;
32+
33+
// timeunit: ms, default 60000ms
34+
private int keepAliveTimeout = 60 * 1000; // Keep units consistent
35+
36+
// timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
37+
private int connectionTimeout = 5000;
38+
39+
// timeunit: ms, default 5000ms
40+
private int idleTimeout = 5000;
41+
42+
// maximum number of HTTP/1 connections a client will pool, default 50
43+
private int maxConnectionPoolSize = 50;
44+
45+
// retry config
46+
private McpRetryConfig retryConfig = new McpRetryConfig();
47+
48+
private String deliveryStrategy = "ROUND_ROBIN";
49+
50+
private boolean skipDeliverException = false;
51+
52+
// managed pipelining param, default true
53+
private boolean isParallelized = true;
54+
55+
private int parallelism = 2;
56+
57+
58+
/**
59+
* Fill default values if absent (When there are multiple default values for a field)
60+
*
61+
* @param config SinkConnectorConfig
62+
*/
63+
public static void populateFieldsWithDefaults(SinkConnectorConfig config) {
64+
/*
65+
* set default values for idleTimeout
66+
* recommended scope: common(5s - 10s), webhook(15s - 30s)
67+
*/
68+
final int commonHttpIdleTimeout = 5000;
69+
70+
// Set default values for idleTimeout
71+
if (config.getIdleTimeout() == 0) {
72+
config.setIdleTimeout(commonHttpIdleTimeout);
73+
}
74+
75+
}
76+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.mcp;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import lombok.Data;
24+
25+
@Data
26+
public class SourceConnectorConfig {
27+
28+
private String connectorName;
29+
30+
private String path = "/";
31+
32+
private int port;
33+
34+
// timeunit: ms, default 5000ms
35+
private int idleTimeout = 5000;
36+
37+
/**
38+
* <ul>
39+
* <li>The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data </li>
40+
* <li>Default is 1MB (1024 * 1024 bytes). </li>
41+
* <li>If you receive a "size exceed allowed maximum capacity" error, you can increase this value. </li>
42+
* <li>Note: This applies only when handling form data submissions.</li>
43+
* </ul>
44+
*/
45+
private int maxFormAttributeSize = 1024 * 1024;
46+
47+
// max size of the queue, default 1000
48+
private int maxStorageSize = 1000;
49+
50+
// batch size, default 10
51+
private int batchSize = 10;
52+
53+
// protocol, default CloudEvent
54+
private String protocol = "Mcp";
55+
56+
// extra config, e.g. GitHub secret
57+
private Map<String, String> extraConfig = new HashMap<>();
58+
59+
// data consistency enabled, default true
60+
private boolean dataConsistencyEnabled = false;
61+
62+
private String forwardPath;
63+
}

eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,17 @@ public static <T> T parseObject(byte[] bytes, Class<T> clazz) {
162162
}
163163
}
164164

165+
public static <T> T parseObject(String text, TypeReference<T> typeReference) {
166+
if (StringUtils.isEmpty(text)) {
167+
return null;
168+
}
169+
try {
170+
return OBJECT_MAPPER.readValue(text, typeReference);
171+
} catch (JsonProcessingException e) {
172+
throw new JsonException("deserialize json string to object error", e);
173+
}
174+
}
175+
165176
/**
166177
* parse json string to object.
167178
*

eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
#
1717

1818
sourceEnable: true
19-
sinkEnable: false
19+
sinkEnable: true
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
20+
implementation project(":eventmesh-common")
21+
implementation project(":eventmesh-connectors:eventmesh-connector-http")
22+
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
23+
implementation "io.cloudevents:cloudevents-core"
24+
implementation "com.google.guava:guava"
25+
implementation "io.cloudevents:cloudevents-json-jackson"
26+
implementation ("io.grpc:grpc-protobuf:1.68.0") {
27+
exclude group: "com.google.protobuf", module: "protobuf-java"
28+
}
29+
implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
30+
implementation 'io.vertx:vertx-web:4.5.8'
31+
implementation 'io.vertx:vertx-web-client:4.5.9'
32+
implementation 'dev.failsafe:failsafe:3.3.2'
33+
34+
35+
testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4'
36+
testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.4'
37+
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
38+
implementation 'io.netty:netty-codec-http:4.1.114.Final'
39+
compileOnly 'org.projectlombok:lombok'
40+
annotationProcessor 'org.projectlombok:lombok'
41+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pluginType=connector
18+
pluginName=mcp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.eventmesh.connector.mcp.config;
20+
21+
import org.apache.eventmesh.common.config.connector.Config;
22+
23+
import lombok.Data;
24+
import lombok.EqualsAndHashCode;
25+
26+
27+
@Data
28+
@EqualsAndHashCode(callSuper = true)
29+
public class McpServerConfig extends Config {
30+
31+
private boolean sourceEnable;
32+
33+
private boolean sinkEnable;
34+
}

0 commit comments

Comments
 (0)