Skip to content

Commit b53e333

Browse files
committed
Stabilize QUIC and HTTP/3 transport lifecycle
Signed-off-by: jeroen.veltman <jeroen.veltman@nextend.nl>
1 parent 758fcf7 commit b53e333

7 files changed

Lines changed: 387 additions & 35 deletions

File tree

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.rsocket.RSocketErrorException;
77
import java.net.SocketAddress;
88
import java.nio.channels.ClosedChannelException;
9+
import java.util.Queue;
10+
import java.util.concurrent.ConcurrentLinkedQueue;
911
import org.reactivestreams.Subscription;
1012
import reactor.core.CoreSubscriber;
1113
import reactor.core.publisher.Flux;
@@ -24,8 +26,10 @@ class SetupHandlingDuplexConnection extends Flux<ByteBuf>
2426

2527
Subscription s;
2628
boolean firstFrameReceived = false;
29+
boolean subscriberReady = false;
2730

28-
CoreSubscriber<? super ByteBuf> actual;
31+
volatile CoreSubscriber<? super ByteBuf> actual;
32+
final Queue<ByteBuf> pendingFrames = new ConcurrentLinkedQueue<>();
2933

3034
boolean done;
3135
Throwable t;
@@ -82,6 +86,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
8286

8387
this.actual = actual;
8488
actual.onSubscribe(this);
89+
drainPendingFrames(actual);
8590
}
8691

8792
@Override
@@ -91,7 +96,8 @@ public void request(long n) {
9196
return;
9297
}
9398

94-
s.request(Long.MAX_VALUE);
99+
subscriberReady = true;
100+
drainPendingFrames(actual);
95101
}
96102

97103
@Override
@@ -104,7 +110,7 @@ public void cancel() {
104110
public void onSubscribe(Subscription s) {
105111
if (Operators.validate(this.s, s)) {
106112
this.s = s;
107-
s.request(1);
113+
s.request(Long.MAX_VALUE);
108114
}
109115
}
110116

@@ -116,7 +122,12 @@ public void onNext(ByteBuf frame) {
116122
return;
117123
}
118124

119-
actual.onNext(frame);
125+
final CoreSubscriber<? super ByteBuf> actual = this.actual;
126+
if (actual != null && subscriberReady) {
127+
actual.onNext(frame);
128+
} else {
129+
pendingFrames.offer(frame);
130+
}
120131
}
121132

122133
@Override
@@ -173,4 +184,15 @@ public ByteBufAllocator alloc() {
173184
public String toString() {
174185
return "SetupHandlingDuplexConnection{" + "source=" + source + ", done=" + done + '}';
175186
}
187+
188+
private void drainPendingFrames(CoreSubscriber<? super ByteBuf> actual) {
189+
if (actual == null) {
190+
return;
191+
}
192+
193+
ByteBuf frame;
194+
while ((frame = pendingFrames.poll()) != null) {
195+
actual.onNext(frame);
196+
}
197+
}
176198
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public SocketAddress remoteAddress() {
300300
@Override
301301
public void request(long n) {
302302
if (state == 1 && STATE.compareAndSet(this, 1, 2)) {
303-
// happens for the very first time with the initial connection
303+
// Fallback in case the downstream requests before subscribe() finishes initializing.
304304
initConnection(this.activeConnection);
305305
}
306306
}
@@ -315,6 +315,9 @@ public void subscribe(CoreSubscriber<? super ByteBuf> receiverSubscriber) {
315315
if (state == 0 && STATE.compareAndSet(this, 0, 1)) {
316316
receiveSubscriber = receiverSubscriber;
317317
receiverSubscriber.onSubscribe(this);
318+
if (STATE.compareAndSet(this, 1, 2)) {
319+
initConnection(this.activeConnection);
320+
}
318321
}
319322
}
320323

rsocket-transport-h3/src/main/java/io/rsocket/transport/netty/Http3DuplexConnection.java

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public final class Http3DuplexConnection extends BaseDuplexConnection {
4141
private final String side;
4242
private final Channel connection;
4343
private final UnboundedProcessor inbound;
44+
private final Object inboundBufferLock = new Object();
45+
private boolean handlingInboundData;
46+
private boolean inboundBufferReleasePending;
4447
private ByteBuf inboundBuffer;
4548

4649
public Http3DuplexConnection(Channel connection) {
@@ -143,33 +146,53 @@ public void handleHeaders(Http3HeadersFrame frame) {
143146
}
144147

145148
public void handleData(Http3DataFrame frame) {
149+
final ByteBuf buffer;
146150
try {
147151
ByteBuf content = frame.content();
148152
if (!content.isReadable()) {
149153
return;
150154
}
151155

152-
if (inboundBuffer == null) {
153-
inboundBuffer = alloc().buffer(content.readableBytes());
156+
synchronized (inboundBufferLock) {
157+
if (inboundBuffer == null) {
158+
inboundBuffer = alloc().buffer(content.readableBytes());
159+
}
160+
buffer = inboundBuffer;
161+
handlingInboundData = true;
154162
}
155-
inboundBuffer.writeBytes(content, content.readerIndex(), content.readableBytes());
156163

157-
while (inboundBuffer.readableBytes() >= FrameLengthCodec.FRAME_LENGTH_SIZE) {
158-
inboundBuffer.markReaderIndex();
159-
int frameLength = FrameLengthCodec.length(inboundBuffer);
160-
if (inboundBuffer.readableBytes() < FrameLengthCodec.FRAME_LENGTH_SIZE + frameLength) {
161-
inboundBuffer.resetReaderIndex();
164+
buffer.writeBytes(content, content.readerIndex(), content.readableBytes());
165+
166+
while (buffer.readableBytes() >= FrameLengthCodec.FRAME_LENGTH_SIZE) {
167+
buffer.markReaderIndex();
168+
int frameLength = FrameLengthCodec.length(buffer);
169+
if (buffer.readableBytes() < FrameLengthCodec.FRAME_LENGTH_SIZE + frameLength) {
170+
buffer.resetReaderIndex();
162171
break;
163172
}
164173

165-
inboundBuffer.skipBytes(FrameLengthCodec.FRAME_LENGTH_SIZE);
166-
inbound.tryEmitNormal(inboundBuffer.readRetainedSlice(frameLength));
174+
buffer.skipBytes(FrameLengthCodec.FRAME_LENGTH_SIZE);
175+
inbound.tryEmitNormal(buffer.readRetainedSlice(frameLength));
176+
}
177+
178+
boolean shouldRelease;
179+
synchronized (inboundBufferLock) {
180+
handlingInboundData = false;
181+
if (!inboundBufferReleasePending && buffer.isReadable()) {
182+
buffer.discardReadBytes();
183+
}
184+
185+
shouldRelease = inboundBufferReleasePending || !buffer.isReadable();
186+
if (shouldRelease) {
187+
inboundBufferReleasePending = false;
188+
if (inboundBuffer == buffer) {
189+
inboundBuffer = null;
190+
}
191+
}
167192
}
168193

169-
if (inboundBuffer.isReadable()) {
170-
inboundBuffer.discardReadBytes();
171-
} else {
172-
releaseInboundBuffer();
194+
if (shouldRelease) {
195+
buffer.release();
173196
}
174197
} finally {
175198
frame.release();
@@ -188,10 +211,23 @@ public void handleInputClosed() {
188211
}
189212

190213
private void releaseInboundBuffer() {
191-
if (inboundBuffer != null) {
192-
inboundBuffer.release();
214+
final ByteBuf bufferToRelease;
215+
synchronized (inboundBufferLock) {
216+
if (inboundBuffer == null) {
217+
return;
218+
}
219+
220+
if (handlingInboundData) {
221+
inboundBufferReleasePending = true;
222+
return;
223+
}
224+
225+
bufferToRelease = inboundBuffer;
193226
inboundBuffer = null;
227+
inboundBufferReleasePending = false;
194228
}
229+
230+
bufferToRelease.release();
195231
}
196232

197233
@Override

rsocket-transport-quic/src/main/java/io/rsocket/transport/netty/QuicDuplexConnection.java

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.rsocket.frame.FrameLengthCodec;
3131
import io.rsocket.internal.BaseDuplexConnection;
3232
import io.rsocket.internal.UnboundedProcessor;
33-
import java.nio.channels.ClosedChannelException;
3433
import java.net.SocketAddress;
3534
import java.util.Objects;
3635
import reactor.core.publisher.Flux;
@@ -41,6 +40,10 @@ public final class QuicDuplexConnection extends BaseDuplexConnection {
4140
private final String side;
4241
private final Channel connection;
4342
private final UnboundedProcessor inbound;
43+
private final Object inboundBufferLock = new Object();
44+
private boolean handlingInboundData;
45+
private boolean inboundBufferReleasePending;
46+
private ByteBuf inboundBuffer;
4447

4548
public QuicDuplexConnection(Channel connection) {
4649
this("unknown", connection);
@@ -50,6 +53,17 @@ public QuicDuplexConnection(String side, Channel connection) {
5053
this.connection = Objects.requireNonNull(connection, "connection must not be null");
5154
this.side = side;
5255
this.inbound = new UnboundedProcessor();
56+
this.connection
57+
.closeFuture()
58+
.addListener(
59+
future -> {
60+
releaseInboundBuffer();
61+
if (future.isSuccess()) {
62+
onClose.tryEmitEmpty();
63+
} else {
64+
onClose.tryEmitError(future.cause());
65+
}
66+
});
5367

5468
this.connection
5569
.pipeline()
@@ -58,28 +72,23 @@ public QuicDuplexConnection(String side, Channel connection) {
5872
@Override
5973
public void channelRead(ChannelHandlerContext ctx, Object msg) {
6074
if (msg instanceof ByteBuf) {
61-
ByteBuf byteBuf = (ByteBuf) msg;
62-
ByteBuf frame = null;
63-
try {
64-
frame = FrameLengthCodec.frame(byteBuf).retain();
65-
} finally {
66-
byteBuf.release();
67-
}
68-
inbound.tryEmitNormal(frame);
75+
handleData((ByteBuf) msg);
6976
} else {
7077
ctx.fireChannelRead(msg);
7178
}
7279
}
7380

7481
@Override
7582
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
83+
releaseInboundBuffer();
7684
inbound.tryEmitError(cause);
7785
onClose.tryEmitError(cause);
7886
}
7987

8088
@Override
8189
public void channelInactive(ChannelHandlerContext ctx) {
82-
inbound.tryEmitError(new ClosedChannelException());
90+
releaseInboundBuffer();
91+
inbound.tryEmitComplete();
8392
onClose.tryEmitEmpty();
8493
ctx.fireChannelInactive();
8594
}
@@ -103,6 +112,7 @@ public void channelInactive(ChannelHandlerContext ctx) {
103112
1)
104113
.doOnError(
105114
throwable -> {
115+
releaseInboundBuffer();
106116
onClose.tryEmitError(throwable);
107117
this.connection.close();
108118
})
@@ -134,6 +144,7 @@ public SocketAddress remoteAddress() {
134144

135145
@Override
136146
protected void doOnClose() {
147+
releaseInboundBuffer();
137148
connection.close();
138149
}
139150

@@ -162,4 +173,74 @@ public void sendFrame(int streamId, ByteBuf frame) {
162173
public String toString() {
163174
return "QuicDuplexConnection{" + "side='" + side + '\'' + ", connection=" + connection + '}';
164175
}
176+
177+
private void handleData(ByteBuf byteBuf) {
178+
final ByteBuf buffer;
179+
try {
180+
if (!byteBuf.isReadable()) {
181+
return;
182+
}
183+
184+
synchronized (inboundBufferLock) {
185+
if (inboundBuffer == null) {
186+
inboundBuffer = alloc().buffer(byteBuf.readableBytes());
187+
}
188+
buffer = inboundBuffer;
189+
handlingInboundData = true;
190+
}
191+
192+
buffer.writeBytes(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes());
193+
194+
while (buffer.readableBytes() >= FrameLengthCodec.FRAME_LENGTH_SIZE) {
195+
buffer.markReaderIndex();
196+
int frameLength = FrameLengthCodec.length(buffer);
197+
if (buffer.readableBytes() < FrameLengthCodec.FRAME_LENGTH_SIZE + frameLength) {
198+
buffer.resetReaderIndex();
199+
break;
200+
}
201+
202+
buffer.skipBytes(FrameLengthCodec.FRAME_LENGTH_SIZE);
203+
inbound.tryEmitNormal(buffer.readRetainedSlice(frameLength));
204+
}
205+
206+
boolean shouldRelease;
207+
synchronized (inboundBufferLock) {
208+
handlingInboundData = false;
209+
if (!inboundBufferReleasePending && buffer.isReadable()) {
210+
buffer.discardReadBytes();
211+
}
212+
213+
shouldRelease = inboundBufferReleasePending || !buffer.isReadable();
214+
if (shouldRelease) {
215+
inboundBufferReleasePending = false;
216+
if (inboundBuffer == buffer) {
217+
inboundBuffer = null;
218+
}
219+
}
220+
}
221+
222+
if (shouldRelease) {
223+
buffer.release();
224+
}
225+
} finally {
226+
byteBuf.release();
227+
}
228+
}
229+
230+
private void releaseInboundBuffer() {
231+
synchronized (inboundBufferLock) {
232+
if (inboundBuffer == null) {
233+
return;
234+
}
235+
236+
if (handlingInboundData) {
237+
inboundBufferReleasePending = true;
238+
return;
239+
}
240+
241+
inboundBuffer.release();
242+
inboundBuffer = null;
243+
inboundBufferReleasePending = false;
244+
}
245+
}
165246
}

rsocket-transport-quic/src/main/java/io/rsocket/transport/netty/internal/QuicTransportBootstrap.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,8 @@ public static Mono<CloseableChannel> bindServer(
161161
protected void initChannel(QuicStreamChannel ch) {
162162
final DuplexConnection connection =
163163
new QuicDuplexConnection("server", ch);
164-
acceptor
165-
.apply(connection)
166-
.doFinally(__ -> ch.close())
167-
.subscribe(null, t -> ch.close());
164+
acceptor.apply(connection).subscribe(null, t -> ch.close());
165+
connection.onClose().doFinally(__ -> ch.close()).subscribe();
168166
}
169167
})
170168
.build();

0 commit comments

Comments
 (0)