Skip to content

Commit 26c8d52

Browse files
committed
fix(mcp): resolve SSE truncation and I/O thread blocking in streamable transport
This commit fixes a race condition where the Streamable HTTP transport would randomly return an empty payload ("") instead of the tool execution result. The failure occurred because the Jooby I/O thread was being blocked, causing the server to tear down the TCP socket before the final SSE chunk could be flushed to the network. Details: * Replaced `.block()` calls inside `ctx.upgrade()` with non-blocking Reactor `.subscribe()` chains to prevent I/O thread deadlocks. * Refactored `notifyClients`, `closeGracefully`, and the `lastId` replay loop to use native Reactor `Flux` chains instead of blocking Java `.parallelStream().forEach()` loops. * Added a 50ms `Mono.delay` to `JoobyStreamableMcpSessionTransport.closeGracefully()`. This guarantees the underlying server (e.g., Undertow) has a sufficient buffer window to physically flush the final SSE event to the network layer before the connection is destroyed.
1 parent d68adc4 commit 26c8d52

5 files changed

Lines changed: 253 additions & 441 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.mcp.transport;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import io.modelcontextprotocol.json.McpJsonMapper;
12+
import io.modelcontextprotocol.json.TypeRef;
13+
import io.modelcontextprotocol.spec.McpServerTransport;
14+
import reactor.core.publisher.Mono;
15+
16+
public abstract class AbstractMcpTransport implements McpServerTransport {
17+
protected final Logger log = LoggerFactory.getLogger(getClass());
18+
19+
protected final McpJsonMapper mcpJsonMapper;
20+
21+
public AbstractMcpTransport(McpJsonMapper mcpJsonMapper) {
22+
this.mcpJsonMapper = mcpJsonMapper;
23+
}
24+
25+
@Override
26+
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
27+
return mcpJsonMapper.convertValue(data, typeRef);
28+
}
29+
30+
@Override
31+
public Mono<Void> closeGracefully() {
32+
return Mono.fromRunnable(this::close);
33+
}
34+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.mcp.transport;
7+
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import io.jooby.Context;
15+
import io.modelcontextprotocol.json.McpJsonMapper;
16+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
17+
import io.modelcontextprotocol.spec.McpServerSession;
18+
import io.modelcontextprotocol.spec.McpServerTransportProvider;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
22+
public abstract class AbstractMcpTransportProvider implements McpServerTransportProvider {
23+
24+
protected final Logger log = LoggerFactory.getLogger(getClass());
25+
26+
protected final McpJsonMapper mcpJsonMapper;
27+
protected final McpTransportContextExtractor<Context> contextExtractor;
28+
protected final ConcurrentHashMap<String, McpServerSession> sessions = new ConcurrentHashMap<>();
29+
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
30+
protected McpServerSession.Factory sessionFactory;
31+
32+
public AbstractMcpTransportProvider(
33+
McpJsonMapper mcpJsonMapper, McpTransportContextExtractor<Context> contextExtractor) {
34+
this.mcpJsonMapper = mcpJsonMapper;
35+
this.contextExtractor = contextExtractor;
36+
}
37+
38+
protected abstract String transportName();
39+
40+
@Override
41+
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
42+
this.sessionFactory = sessionFactory;
43+
}
44+
45+
@Override
46+
public Mono<Void> notifyClients(String method, Object params) {
47+
if (sessions.isEmpty()) {
48+
log.debug("No active {} sessions to broadcast a message to", transportName());
49+
return Mono.empty();
50+
}
51+
52+
if (log.isDebugEnabled()) {
53+
log.debug(
54+
"Attempting to broadcast to {} active {} sessions", sessions.size(), transportName());
55+
}
56+
57+
return Flux.fromIterable(sessions.values())
58+
.flatMap(
59+
session ->
60+
session
61+
.sendNotification(method, params)
62+
.doOnError(
63+
e ->
64+
log.error(
65+
"Failed to send message to {} session {}: {}",
66+
transportName(),
67+
session.getId(),
68+
e.getMessage()))
69+
.onErrorComplete())
70+
.then();
71+
}
72+
73+
@Override
74+
public Mono<Void> closeGracefully() {
75+
return Flux.fromIterable(sessions.values())
76+
.doFirst(
77+
() -> {
78+
isClosing.set(true);
79+
if (log.isDebugEnabled()) {
80+
log.debug(
81+
"Initiating graceful shutdown for {} {} sessions",
82+
sessions.size(),
83+
transportName());
84+
}
85+
})
86+
.flatMap(McpServerSession::closeGracefully)
87+
.doFinally(signalType -> sessions.clear())
88+
.then();
89+
}
90+
}

modules/jooby-mcp/src/main/java/io/jooby/mcp/transport/SseTransportProvider.java

Lines changed: 17 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -8,57 +8,28 @@
88
import static io.jooby.mcp.transport.TransportConstants.*;
99

1010
import java.io.IOException;
11-
import java.util.concurrent.ConcurrentHashMap;
12-
import java.util.concurrent.atomic.AtomicBoolean;
13-
14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
1611

1712
import io.jooby.*;
1813
import io.jooby.internal.mcp.McpServerConfig;
1914
import io.modelcontextprotocol.common.McpTransportContext;
2015
import io.modelcontextprotocol.json.McpJsonMapper;
21-
import io.modelcontextprotocol.json.TypeRef;
2216
import io.modelcontextprotocol.server.McpTransportContextExtractor;
2317
import io.modelcontextprotocol.spec.*;
24-
import reactor.core.publisher.Flux;
2518
import reactor.core.publisher.Mono;
2619

27-
/**
28-
* Provides SSE transport implementation for MCP server using Jooby framework. Handles client
29-
* connections, message routing, and session management.
30-
*/
31-
@SuppressWarnings("PMD")
32-
public class SseTransportProvider implements McpServerTransportProvider {
33-
34-
private static final Logger LOG = LoggerFactory.getLogger(SseTransportProvider.class);
20+
public class SseTransportProvider extends AbstractMcpTransportProvider {
3521

3622
private static final String ENDPOINT_EVENT_TYPE = "endpoint";
3723
private static final String SESSION_ID_KEY = "sessionId";
38-
3924
private final String messageEndpoint;
40-
private final McpJsonMapper mcpJsonMapper;
41-
private final ConcurrentHashMap<String, McpServerSession> sessions = new ConcurrentHashMap<>();
42-
private final McpTransportContextExtractor<Context> contextExtractor;
43-
44-
private McpServerSession.Factory sessionFactory;
45-
private final AtomicBoolean isClosing = new AtomicBoolean(false);
46-
47-
/**
48-
* Constructs a new Jooby Reactive SSE transport provider instance.
49-
*
50-
* @param app The Jooby application instance to register endpoints with
51-
* @param serverConfig The MCP server configuration containing endpoint settings
52-
* @param mcpJsonMapper The MCP JSON mapper for message serialization/deserialization
53-
*/
25+
5426
public SseTransportProvider(
5527
Jooby app,
5628
McpServerConfig serverConfig,
5729
McpJsonMapper mcpJsonMapper,
5830
McpTransportContextExtractor<Context> contextExtractor) {
59-
this.mcpJsonMapper = mcpJsonMapper;
31+
super(mcpJsonMapper, contextExtractor);
6032
this.messageEndpoint = serverConfig.getMessageEndpoint();
61-
this.contextExtractor = contextExtractor;
6233
String sseEndpoint = serverConfig.getSseEndpoint();
6334

6435
app.head(sseEndpoint, ctx -> StatusCode.OK).produces(TEXT_EVENT_STREAM);
@@ -67,66 +38,24 @@ public SseTransportProvider(
6738
}
6839

6940
@Override
70-
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
71-
this.sessionFactory = sessionFactory;
72-
}
73-
74-
@Override
75-
public Mono<Void> notifyClients(String method, Object params) {
76-
if (sessions.isEmpty()) {
77-
LOG.debug("No active sessions to broadcast a message to");
78-
return Mono.empty();
79-
}
80-
81-
if (LOG.isDebugEnabled()) {
82-
LOG.debug("Attempting to broadcast a message to {} active sessions", sessions.size());
83-
}
84-
85-
return Flux.fromIterable(sessions.values())
86-
.flatMap(
87-
session ->
88-
session
89-
.sendNotification(method, params)
90-
.doOnError(
91-
e ->
92-
LOG.error(
93-
"Failed to send message to session {}: {}",
94-
session.getId(),
95-
e.getMessage()))
96-
.onErrorComplete())
97-
.then();
98-
}
99-
100-
@Override
101-
public Mono<Void> closeGracefully() {
102-
return Flux.fromIterable(sessions.values())
103-
.doFirst(
104-
() -> {
105-
isClosing.set(true);
106-
if (LOG.isDebugEnabled()) {
107-
LOG.debug("Initiating graceful shutdown with {} active sessions", sessions.size());
108-
}
109-
})
110-
.flatMap(McpServerSession::closeGracefully)
111-
.doFinally(signalType -> sessions.clear())
112-
.then();
41+
protected String transportName() {
42+
return "SSE";
11343
}
11444

11545
private void handleSseConnection(ServerSentEmitter sse) {
116-
JoobyMcpSessionTransport transport = new JoobyMcpSessionTransport(sse);
46+
JoobyMcpSessionTransport transport = new JoobyMcpSessionTransport(mcpJsonMapper, sse);
11747
McpServerSession session = sessionFactory.create(transport);
11848
String sessionId = session.getId();
11949

120-
LOG.debug("New SSE connection has been established. Session ID: {}", sessionId);
50+
log.debug("New SSE connection established. Session ID: {}", sessionId);
12151
sessions.put(sessionId, session);
12252

12353
sse.onClose(
12454
() -> {
125-
LOG.debug("Session with ID {} has been cancelled", sessionId);
55+
log.debug("Session with ID {} has been cancelled", sessionId);
12656
sessions.remove(sessionId);
12757
});
12858

129-
LOG.debug("Sending initial endpoint event to session: {}", sessionId);
13059
sse.send(
13160
new ServerSentMessage(this.messageEndpoint + "?sessionId=" + sessionId)
13261
.setEvent(ENDPOINT_EVENT_TYPE));
@@ -143,7 +72,7 @@ private Object handleMessage(Context ctx) {
14372
if (ctx.query(SESSION_ID_KEY).isMissing()) {
14473
ctx.setResponseCode(StatusCode.BAD_REQUEST);
14574
return McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
146-
.message("Session ID missing in message endpoint")
75+
.message("Session ID missing")
14776
.build();
14877
}
14978

@@ -153,7 +82,7 @@ private Object handleMessage(Context ctx) {
15382
if (session == null) {
15483
ctx.setResponseCode(StatusCode.NOT_FOUND);
15584
return McpError.builder(McpSchema.ErrorCodes.RESOURCE_NOT_FOUND)
156-
.message("Session not found: " + sessionId)
85+
.message("Session not found")
15786
.build();
15887
}
15988

@@ -167,30 +96,28 @@ private Object handleMessage(Context ctx) {
16796
.handle(message)
16897
.contextWrite(
16998
reactorCtx ->
170-
reactorCtx
171-
.put(io.modelcontextprotocol.common.McpTransportContext.KEY, transportContext)
172-
.put("CTX", ctx))
99+
reactorCtx.put(McpTransportContext.KEY, transportContext).put("CTX", ctx))
173100
.then(Mono.just((Object) StatusCode.OK))
174101
.onErrorResume(
175102
error -> {
176-
LOG.error("Error processing message: {}", error.getMessage());
103+
log.error("Error processing message: {}", error.getMessage());
177104
return Mono.just(StatusCode.OK);
178105
})
179106
.switchIfEmpty(Mono.just((Object) StatusCode.OK))
180107
.block();
181108
} catch (IOException | IllegalArgumentException e) {
182-
LOG.error("Failed to deserialize message: {}", e.getMessage());
109+
log.error("Failed to deserialize message: {}", e.getMessage());
183110
return McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR)
184111
.message("Invalid message format")
185112
.build();
186113
}
187114
}
188115

189-
private class JoobyMcpSessionTransport implements McpServerTransport {
190-
116+
private static class JoobyMcpSessionTransport extends AbstractMcpTransport {
191117
private final ServerSentEmitter sse;
192118

193-
public JoobyMcpSessionTransport(ServerSentEmitter sse) {
119+
public JoobyMcpSessionTransport(McpJsonMapper mcpJsonMapper, ServerSentEmitter sse) {
120+
super(mcpJsonMapper);
194121
this.sse = sse;
195122
}
196123

@@ -202,22 +129,12 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
202129
String jsonText = mcpJsonMapper.writeValueAsString(message);
203130
sse.send(new ServerSentMessage(jsonText).setEvent(MESSAGE_EVENT_TYPE));
204131
} catch (Exception e) {
205-
LOG.error("Failed to send message: {}", e.getMessage());
132+
log.error("Failed to send message: {}", e.getMessage());
206133
sse.send(SSE_ERROR_EVENT, e.getMessage());
207134
}
208135
});
209136
}
210137

211-
@Override
212-
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
213-
return mcpJsonMapper.convertValue(data, typeRef);
214-
}
215-
216-
@Override
217-
public Mono<Void> closeGracefully() {
218-
return Mono.fromRunnable(sse::close);
219-
}
220-
221138
@Override
222139
public void close() {
223140
sse.close();

0 commit comments

Comments
 (0)