From 1a64f83a2f851a6b12331ee2e1683ddd83cf2118 Mon Sep 17 00:00:00 2001 From: Aayush Atharva Date: Sat, 13 Jun 2026 12:10:08 +0000 Subject: [PATCH] Fix HTTP/2 silent request timeouts under connection disruptions (#2160) --- .../netty/channel/ChannelManager.java | 14 +- .../netty/channel/Http2ConnectionState.java | 89 +- .../netty/handler/Http2Handler.java | 35 +- .../netty/request/NettyRequestSender.java | 43 +- .../Http2MultiplexBugRegressionTest.java | 927 ++++++++++++++++++ .../Http2StreamOrphanRegressionTest.java | 374 +++++++ .../channel/Http2ConnectionStateTest.java | 652 ++++++++++++ 7 files changed, 2111 insertions(+), 23 deletions(-) create mode 100644 client/src/test/java/org/asynchttpclient/Http2MultiplexBugRegressionTest.java create mode 100644 client/src/test/java/org/asynchttpclient/Http2StreamOrphanRegressionTest.java create mode 100644 client/src/test/java/org/asynchttpclient/netty/channel/Http2ConnectionStateTest.java diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 0af03dfd41..e6a84f093e 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -87,6 +87,7 @@ import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Map; @@ -353,8 +354,17 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) { state.setPartitionKey(partitionKey); } http2Connections.put(partitionKey, channel); - // Auto-remove from registry when the connection closes - channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel)); + // When the connection closes, remove it from the registry AND fail any requests still queued + // for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent + // connection drops have no stream channel (so no channelInactive is ever delivered for them) + // and would survive only until the request timeout fires — the silent-timeout bug of #2160. + channel.closeFuture().addListener(future -> { + removeHttp2Connection(partitionKey, channel); + if (state != null) { + state.failPendingOpeners(orphan -> + orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened"))); + } + }); } /** diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java index 3911c45d72..3a081cf5a7 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java @@ -16,10 +16,14 @@ package org.asynchttpclient.netty.channel; import io.netty.util.AttributeKey; +import org.asynchttpclient.netty.NettyResponseFuture; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * Tracks per-connection HTTP/2 state: active stream count, max concurrent streams, @@ -30,15 +34,31 @@ public class Http2ConnectionState { public static final AttributeKey HTTP2_STATE_KEY = AttributeKey.valueOf("http2ConnectionState"); + /** + * A request waiting for a free stream slot. Carries both the future (so it can be failed if the + * connection dies before a slot frees up) and the action that actually opens the stream. + */ + private static final class PendingOpener { + final NettyResponseFuture future; + final Runnable opener; + + PendingOpener(NettyResponseFuture future, Runnable opener) { + this.future = future; + this.opener = opener; + } + } + private final AtomicInteger activeStreams = new AtomicInteger(0); private volatile int maxConcurrentStreams = Integer.MAX_VALUE; private final AtomicBoolean draining = new AtomicBoolean(false); private volatile int lastGoAwayStreamId = Integer.MAX_VALUE; - private final ConcurrentLinkedQueue pendingOpeners = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue pendingOpeners = new ConcurrentLinkedQueue<>(); + private final Object pendingLock = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); private volatile Object partitionKey; public boolean tryAcquireStream() { - if (draining.get()) { + if (draining.get() || closed.get()) { return false; } while (true) { @@ -54,22 +74,63 @@ public boolean tryAcquireStream() { public void releaseStream() { activeStreams.decrementAndGet(); - // Try to dequeue and run a pending opener - Runnable pending = pendingOpeners.poll(); - if (pending != null) { - pending.run(); - } + drainPendingOpeners(); } public void addPendingOpener(Runnable opener) { - pendingOpeners.add(opener); - // Re-check in case a stream was released between the failed tryAcquire and this enqueue - if (tryAcquireStream()) { - Runnable dequeued = pendingOpeners.poll(); - if (dequeued != null) { - dequeued.run(); + addPendingOpener(null, opener); + } + + public void addPendingOpener(NettyResponseFuture future, Runnable opener) { + synchronized (pendingLock) { + if (tryAcquireStream()) { + opener.run(); } else { - releaseStream(); + pendingOpeners.add(new PendingOpener(future, opener)); + } + } + } + + private void drainPendingOpeners() { + synchronized (pendingLock) { + PendingOpener pending = pendingOpeners.poll(); + if (pending != null) { + if (tryAcquireStream()) { + pending.opener.run(); + } else { + // Put it back — another releaseStream() will pick it up + pendingOpeners.offer(pending); + } + } + } + } + + /** + * Permanently marks the connection unusable and hands every queued (never-started) request to + * {@code failer}. After this call {@link #tryAcquireStream()} returns {@code false}, so a request + * enqueued concurrently with the close is failed by its own caller's post-enqueue active-channel + * check rather than being silently orphaned. + *

+ * Without this, requests sitting in {@link #pendingOpeners} when the parent connection drops are + * never completed and survive only until the request timeout fires — the silent-timeout + * regression of Issue #2160 (a queued request has no stream channel, hence no channelInactive + * is ever delivered for it). + * + * @param failer invoked once per orphaned request future (e.g. to abort it) + */ + public void failPendingOpeners(Consumer> failer) { + closed.set(true); + List drained = new ArrayList<>(); + synchronized (pendingLock) { + PendingOpener p; + while ((p = pendingOpeners.poll()) != null) { + drained.add(p); + } + } + // Fail outside the lock — failer may re-enter client code. + for (PendingOpener p : drained) { + if (p.future != null) { + failer.accept(p.future); } } } diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java index 7bac415373..0d8e7c6c45 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java @@ -196,7 +196,10 @@ private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Cha */ private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel, NettyResponseFuture future) { long errorCode = resetFrame.errorCode(); - readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode)); + // RFC 7540 §5.4.2/§6.4: RST_STREAM is stream-scoped and MUST NOT terminate the connection. + // Fail only this stream's future and close only the (single-use) stream child channel — + // sibling streams multiplexed on the same parent connection must be left untouched. + streamFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode)); } /** @@ -300,11 +303,41 @@ private void readFailed(Channel channel, NettyResponseFuture future, Throwabl } } + /** + * Fails a single stream's future WITHOUT closing the parent connection. Used for stream-scoped + * events (RST_STREAM, and the {@code channelInactive}/exception Netty delivers when one stream + * dies). {@link #finishUpdate} with {@code close=false} closes only the single-use stream child + * channel and releases its stream slot, leaving the parent connection and its sibling multiplexed + * streams untouched. + *

+ * When the PARENT connection genuinely drops, Netty fires {@code channelInactive} on every child + * stream, so each in-flight future is still failed individually and promptly. + */ + private void streamFailed(Channel channel, NettyResponseFuture future, Throwable t) { + if (future.isDone()) { + return; + } + try { + requestSender.abort(channel, future, t); + } catch (Exception abortException) { + logger.debug("Abort failed", abortException); + } finally { + finishUpdate(future, channel, false); + } + } + @Override public void handleException(NettyResponseFuture future, Throwable error) { + // Stream-scoped: an exception on one stream child channel must not tear down the parent + // connection that sibling multiplexed streams share (see streamFailed). + streamFailed(future.channel(), future, error); } @Override public void handleChannelInactive(NettyResponseFuture future) { + // Stream-scoped (see streamFailed): closing the parent here would fail unrelated sibling + // streams on the same connection — the RST_STREAM/single-stream-close blast-radius bug. + streamFailed(future.channel(), future, + new IOException("HTTP/2 stream channel closed unexpectedly")); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 08451ce953..aaa773efe9 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -276,8 +276,14 @@ private ListenableFuture sendRequestWithOpenChannel(NettyResponseFuture void writeRequest(NettyResponseFuture future, Channel channel) { */ private void writeHttp2Request(NettyResponseFuture future, Channel parentChannel) { Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); - Runnable openStream = () -> openHttp2Stream(future, parentChannel); if (state != null && !state.tryAcquireStream()) { if (state.isDraining()) { @@ -495,13 +500,27 @@ private void writeHttp2Request(NettyResponseFuture future, Channel parent return; } // Queue for later when a stream slot opens up - state.addPendingOpener(openStream); + state.addPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state)); + // The parent connection may have closed concurrently with the enqueue above; if so its + // close listener already drained the queue and this future would be orphaned (it has no + // stream channel, so no channelInactive is ever delivered for it). Detect that race and + // fail it here so it never survives only to the request timeout (Issue #2160). + if (!parentChannel.isActive() && !future.isDone()) { + abort(parentChannel, future, + new java.io.IOException("HTTP/2 connection closed while request was queued")); + } return; } - openStream.run(); + openHttp2Stream(future, parentChannel, state); + } + + private static void releaseHttp2Stream(Http2ConnectionState state) { + if (state != null) { + state.releaseStream(); + } } - private void openHttp2Stream(NettyResponseFuture future, Channel parentChannel) { + private void openHttp2Stream(NettyResponseFuture future, Channel parentChannel, Http2ConnectionState state) { new Http2StreamChannelBootstrap(parentChannel) .handler(new ChannelInitializer() { @Override @@ -519,6 +538,7 @@ protected void initChannel(Http2StreamChannel streamCh) { Http2StreamChannel streamChannel = f.getNow(); channelManager.registerOpenChannel(streamChannel); Channels.setAttribute(streamChannel, future); + Channels.setActiveToken(streamChannel); future.attachChannel(streamChannel, false); try { AsyncHandler asyncHandler = future.getAsyncHandler(); @@ -526,6 +546,11 @@ protected void initChannel(Http2StreamChannel streamCh) { asyncHandler.onRequestSend(future.getNettyRequest()); } catch (Exception e) { LOGGER.error("onRequestSend crashed", e); + // The slot was acquired before open(); aborting here completes the + // future, so the stream's later channelInactive won't run finishUpdate + // (its !future.isDone() guard fails). Release the slot explicitly, + // otherwise activeStreams leaks and eventually wedges the connection. + releaseHttp2Stream(state); abort(streamChannel, future, e); return; } @@ -538,9 +563,15 @@ protected void initChannel(Http2StreamChannel streamCh) { scheduleReadTimeout(future); } catch (Exception e) { LOGGER.error("Can't write HTTP/2 request", e); + // See above: release the slot the failed stream will never release itself. + releaseHttp2Stream(state); abort(streamChannel, future, e); } } else { + // Stream channel was never opened — release the acquired stream slot + if (state != null) { + state.releaseStream(); + } abort(parentChannel, future, f.cause()); } }); diff --git a/client/src/test/java/org/asynchttpclient/Http2MultiplexBugRegressionTest.java b/client/src/test/java/org/asynchttpclient/Http2MultiplexBugRegressionTest.java new file mode 100644 index 0000000000..f22896846e --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/Http2MultiplexBugRegressionTest.java @@ -0,0 +1,927 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.pkitesting.CertificateBuilder; +import io.netty.pkitesting.X509Bundle; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Regression tests for the HTTP/2 multiplexing bugs reported in Issue #2160. + *

+ * These tests reproduce the exact failure scenarios — stream semaphore leaks, + * connection drops during multiplexed requests, GOAWAY during stream open, + * and concurrent request stalling — and verify the fixes prevent them. + *

+ * Each test is designed to FAIL on the buggy code and PASS on the fixed code, + * serving as a permanent guardrail against regressions. + */ +public class Http2MultiplexBugRegressionTest { + + private NioEventLoopGroup serverGroup; + private Channel serverChannel; + private ChannelGroup serverChildChannels; + private SslContext serverSslCtx; + private int serverPort; + + @BeforeEach + public void startServer() throws Exception { + X509Bundle bundle = new CertificateBuilder() + .subject("CN=localhost") + .setIsCertificateAuthority(true) + .buildSelfSigned(); + + serverSslCtx = SslContextBuilder.forServer(bundle.toKeyManagerFactory()) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)) + .build(); + + serverGroup = new NioEventLoopGroup(1); + serverChildChannels = new DefaultChannelGroup("h2-regression-test", GlobalEventExecutor.INSTANCE); + } + + @AfterEach + public void stopServer() throws InterruptedException { + if (serverChildChannels != null) { + serverChildChannels.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + if (serverGroup != null) { + serverGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + } + ReferenceCountUtil.release(serverSslCtx); + } + + private String httpsUrl(String path) { + return "https://localhost:" + serverPort + path; + } + + // ========================================================================= + // Server bootstrapping helpers + // ========================================================================= + + /** + * Starts a simple HTTP/2 server that responds 200 OK to every request. + */ + private void startSimpleServer() throws InterruptedException { + startServerWithHandler(() -> new SimpleOkHandler()); + } + + /** + * Starts a server with a custom per-stream handler factory. + */ + private void startServerWithHandler(StreamHandlerFactory factory) throws InterruptedException { + ServerBootstrap b = new ServerBootstrap() + .group(serverGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + serverChildChannels.add(ch); + ch.pipeline() + .addLast("ssl", serverSslCtx.newHandler(ch.alloc())) + .addLast(Http2FrameCodecBuilder.forServer().build()) + .addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + streamCh.pipeline().addLast(factory.create()); + } + })); + } + }); + + serverChannel = b.bind(0).sync().channel(); + serverPort = ((java.net.InetSocketAddress) serverChannel.localAddress()).getPort(); + } + + @FunctionalInterface + private interface StreamHandlerFactory { + SimpleChannelInboundHandler create(); + } + + // ========================================================================= + // Server-side handlers for reproducing specific bugs + // ========================================================================= + + /** + * Simple handler that always responds 200 OK with empty body. + */ + private static class SimpleOkHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; + if (headersFrame.isEndStream()) { + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(responseHeaders, true)); + } + } else if (msg instanceof Http2DataFrame) { + Http2DataFrame dataFrame = (Http2DataFrame) msg; + if (dataFrame.isEndStream()) { + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(responseHeaders, true)); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + /** + * Handler that delays response, then forcibly closes the TCP connection. + * This simulates a connection drop after the stream is opened but before + * the response is received — the exact scenario for Bug 3. + */ + private static class DelayThenDropHandler extends SimpleChannelInboundHandler { + private final long delayMs; + + DelayThenDropHandler(long delayMs) { + this.delayMs = delayMs; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; + if (headersFrame.isEndStream()) { + ctx.executor().schedule(() -> { + // Close the parent (TCP) connection to simulate a network drop + Channel parent = ctx.channel().parent(); + if (parent != null) { + parent.close(); + } + }, delayMs, TimeUnit.MILLISECONDS); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + /** + * Handler that accepts the request headers but then holds the stream open + * without responding. Used to keep streams alive while we test GOAWAY behavior. + */ + private static class HoldOpenHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + // Intentionally do nothing — hold the stream open + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + /** + * Handler that responds with a configurable delay. Used for multiplexing tests. + */ + private static class DelayedResponseHandler extends SimpleChannelInboundHandler { + private final long delayMs; + + DelayedResponseHandler(long delayMs) { + this.delayMs = delayMs; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) { + ctx.executor().schedule(() -> { + if (ctx.channel().isActive()) { + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(responseHeaders, true)); + } + }, delayMs, TimeUnit.MILLISECONDS); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + // ========================================================================= + // BUG 1 REGRESSION: Stream Semaphore Leak on Connection Restart + // + // Scenario: Server sends GOAWAY then closes the connection. If the client + // acquired a stream slot but the stream channel bootstrap fails (because the + // parent channel is closing), the slot leaks. After enough leaks, no new + // streams can be opened and all subsequent requests time out. + // + // The fix: releaseStream() is called in the .open() failure path. + // ========================================================================= + + @Test + public void streamSlotsNotLeakedAfterServerRestart() throws Exception { + // Start a server, send requests, restart the server, send more requests. + // Before the fix, stream slots would leak on each restart and eventually + // the connection would be fully starved. + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(5)) + .setMaxConnections(1) + .setMaxConnectionsPerHost(1))) { + + // Phase 1: Establish connection and verify it works + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode()); + } + + // Phase 2: Restart the server multiple times to induce GOAWAY + connection drops. + // Each restart may cause stream slot leaks if the fix is not applied. + for (int restart = 0; restart < 3; restart++) { + // Kill the server + serverChildChannels.close().sync(); + serverChannel.close().sync(); + + // Brief pause to let client detect the closure + Thread.sleep(200); + + // Restart the server on the same port + ServerBootstrap b = new ServerBootstrap() + .group(serverGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + serverChildChannels.add(ch); + ch.pipeline() + .addLast("ssl", serverSslCtx.newHandler(ch.alloc())) + .addLast(Http2FrameCodecBuilder.forServer().build()) + .addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + streamCh.pipeline().addLast(new SimpleOkHandler()); + } + })); + } + }); + + serverChannel = b.bind(serverPort).sync().channel(); + + // Phase 3: Verify requests still work after restart. + // Before the fix, leaked stream slots would accumulate across restarts + // and eventually cause all requests to queue in pendingOpeners and time out. + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Request failed after server restart #" + restart + ", request #" + i); + } + } + } + } + + @Test + public void highConcurrencyAfterConnectionDropDoesNotStarve() throws Exception { + // This test fires a burst of concurrent requests, kills the connection, + // then fires another burst. Before the fix, the second burst would see + // leaked stream slots from the first burst and eventually time out. + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(5)) + .setMaxConnections(5) + .setMaxConnectionsPerHost(5))) { + + int batchSize = 20; + + // First burst: send many concurrent requests + List> batch1 = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + batch1.add(client.prepareGet(httpsUrl("/ok")) + .execute() + .toCompletableFuture()); + } + for (CompletableFuture f : batch1) { + assertEquals(200, f.get(10, SECONDS).getStatusCode()); + } + + // Kill all server connections to force the client to see channel closures + serverChildChannels.close().sync(); + + // Brief pause + Thread.sleep(200); + + // Second burst: all requests should still succeed (on new connections). + // Before the fix, leaked stream slots from the first burst's connection + // would prevent new streams from being opened. + List> batch2 = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + batch2.add(client.prepareGet(httpsUrl("/ok")) + .execute() + .toCompletableFuture()); + } + for (CompletableFuture f : batch2) { + assertEquals(200, f.get(10, SECONDS).getStatusCode()); + } + } + } + + // ========================================================================= + // BUG 2 REGRESSION: Concurrent Requests on Same H2 Connection Must All Complete + // + // Scenario: Two or more concurrent requests share the same HTTP/2 parent + // channel. If the parent channel closes unexpectedly (e.g., TCP reset), + // ALL concurrent futures must be failed/retried — not just the last one + // whose future was set on the parent channel attribute. + // + // The fix: setAttribute is skipped for HTTP/2 parent channels. + // ========================================================================= + + @Test + public void allConcurrentRequestsFailOnConnectionDrop() throws Exception { + // Server holds all streams open without responding, then we kill the TCP connection. + // Before the fix, only the LAST request's future (the one set on the parent channel + // attribute) would be properly handled. All other futures would hang until timeout. + startServerWithHandler(() -> new HoldOpenHandler()); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(10)) + .setMaxConnectionsPerHost(1))) { + + int numRequests = 5; + CountDownLatch allStarted = new CountDownLatch(numRequests); + List> futures = new ArrayList<>(); + + // Fire 5 concurrent requests — all will be multiplexed on the same H2 connection + for (int i = 0; i < numRequests; i++) { + CompletableFuture f = client.prepareGet(httpsUrl("/hold")) + .execute(new AsyncCompletionHandlerBase() { + @Override + public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) throws Exception { + allStarted.countDown(); + return super.onStatusReceived(responseStatus); + } + }) + .toCompletableFuture(); + futures.add(f); + } + + // Give streams time to be opened on the server side + Thread.sleep(500); + + // Kill the TCP connection from the server side + serverChildChannels.close().sync(); + + // ALL futures must complete (with an error, not a timeout). + // Before the fix, only the last future would be failed; the others + // would hang for 10 seconds until RequestTimeoutTimerTask fires. + long startTime = System.currentTimeMillis(); + int failedCount = 0; + for (CompletableFuture f : futures) { + try { + f.get(5, SECONDS); + } catch (ExecutionException e) { + failedCount++; + } + } + long elapsed = System.currentTimeMillis() - startTime; + + assertEquals(numRequests, failedCount, "All concurrent requests should fail on connection drop"); + + // Critical assertion: the failures should happen quickly (within ~2 seconds), + // not after the 10-second request timeout. Before the fix, orphaned futures + // would only complete via the timeout timer. + assertTrue(elapsed < 5_000, + "Failures should happen quickly (got " + elapsed + "ms), not wait for request timeout"); + } + } + + @Test + public void concurrentMultiplexedRequestsAllSucceed() throws Exception { + // Verify that many concurrent requests on the same H2 connection all succeed + // when there are no connection issues. This would fail before the fix if the + // parent channel attribute overwrite caused state corruption. + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setMaxConnectionsPerHost(1) + .setRequestTimeout(Duration.ofSeconds(10)))) { + + int numRequests = 50; + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference firstError = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + client.prepareGet(httpsUrl("/ok")) + .execute() + .toCompletableFuture() + .whenComplete((response, error) -> { + if (error != null) { + firstError.compareAndSet(null, error); + } else if (response.getStatusCode() == 200) { + successCount.incrementAndGet(); + } + latch.countDown(); + }); + } + + assertTrue(latch.await(30, SECONDS)); + assertNull(firstError.get(), "No errors expected, got: " + firstError.get()); + assertEquals(numRequests, successCount.get(), + "All multiplexed requests should succeed"); + } + } + + // ========================================================================= + // BUG 3 REGRESSION: Stream Channel Inactive Must Fail Future Immediately + // + // Scenario: An HTTP/2 stream is opened and the request is sent, but before + // the response arrives, the TCP connection drops. The stream channel goes + // inactive. Before the fix, the empty handleChannelInactive() meant the + // future was never failed, and it would hang until the request timeout fired. + // + // The fix: handleChannelInactive() now calls readFailed() which aborts + // the future and releases the stream slot. + // ========================================================================= + + @Test + public void streamChannelInactiveFailsFutureImmediately() throws Exception { + // Server opens the stream, waits 200ms, then kills the TCP connection. + // The client should detect the stream channel going inactive and fail + // the future immediately — NOT wait for the request timeout. + startServerWithHandler(() -> new DelayThenDropHandler(200)); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(30)))) { + + long startTime = System.currentTimeMillis(); + try { + client.prepareGet(httpsUrl("/drop")) + .execute() + .get(10, SECONDS); + fail("Should have thrown — server dropped the connection before responding"); + } catch (ExecutionException e) { + long elapsed = System.currentTimeMillis() - startTime; + + // The request should fail quickly (within ~3 seconds), not after 30s timeout. + // Before the fix, with a 30s request timeout, the future would hang for the + // full 30 seconds because handleChannelInactive was a no-op. + assertTrue(elapsed < 5_000, + "Request should fail quickly on connection drop, but took " + elapsed + "ms. " + + "This suggests handleChannelInactive is not properly failing the future."); + + // The exception should be an IOException, not a TimeoutException + Throwable cause = e.getCause(); + assertNotNull(cause); + assertFalse(cause instanceof java.util.concurrent.TimeoutException, + "Should NOT fail with TimeoutException — should get IOException from channel close"); + } + } + } + + @Test + public void multipleStreamChannelInactivesAllResolveQuickly() throws Exception { + // Fire multiple requests, then kill the connection. ALL futures should + // resolve quickly via handleChannelInactive, not via request timeout. + startServerWithHandler(() -> new HoldOpenHandler()); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(30)) + .setMaxConnectionsPerHost(1))) { + + int numRequests = 10; + List> futures = new ArrayList<>(); + + for (int i = 0; i < numRequests; i++) { + futures.add(client.prepareGet(httpsUrl("/hold")) + .execute() + .toCompletableFuture()); + } + + // Let streams get established + Thread.sleep(500); + + // Kill the server connection + long killTime = System.currentTimeMillis(); + serverChildChannels.close().sync(); + + // All futures should resolve quickly + int failedCount = 0; + for (CompletableFuture f : futures) { + try { + f.get(5, SECONDS); + } catch (Exception e) { + failedCount++; + } + } + long elapsed = System.currentTimeMillis() - killTime; + + assertTrue(failedCount > 0, "At least some requests should have failed"); + assertTrue(elapsed < 10_000, + "All futures should resolve within 10s of connection kill, took " + elapsed + "ms"); + } + } + + @Test + public void requestsSucceedAfterStreamChannelInactive() throws Exception { + // After a connection drop (which exercises handleChannelInactive), subsequent + // requests should succeed on a new connection — verifying that the stream slots + // are properly released and the client recovers. + startServerWithHandler(() -> new DelayThenDropHandler(100)); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(5)))) { + + // First request: will fail because server drops the connection + try { + client.prepareGet(httpsUrl("/drop")) + .execute() + .get(5, SECONDS); + } catch (ExecutionException e) { + // Expected + } + + // Restart the server with a normal handler + serverChildChannels.close().sync(); + serverChannel.close().sync(); + Thread.sleep(200); + startSimpleServer(); + + // Subsequent requests should succeed — the stream slots from the failed + // request should have been released by handleChannelInactive -> finishUpdate + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Request #" + i + " should succeed after connection recovery"); + } + } + } + + // ========================================================================= + // BUG 4 REGRESSION: Pending Opener Race Condition Under High Concurrency + // + // Scenario: With a low maxConcurrentStreams (e.g., 2), fire many requests. + // Those beyond the limit are queued as pending openers. Before the fix, the + // TOCTOU race in addPendingOpener could cause one thread's opener to consume + // another thread's stream slot, or run an opener without incrementing + // activeStreams. + // + // The fix: addPendingOpener and drainPendingOpeners use synchronized blocks. + // ========================================================================= + + @Test + public void highConcurrencyWithLowMaxStreamsDoesNotDeadlock() throws Exception { + // Fire many concurrent requests with a client-side maxConcurrentStreams limit. + // Requests beyond the limit are queued as pending openers. + // Before the fix, the race condition in addPendingOpener could cause: + // 1. Openers running without stream slots (exceeding the server's max) + // 2. Openers stuck in the queue permanently (deadlock) + // 3. activeStreams count going negative (allowing too many streams) + startSimpleServer(); + + // Use a low client-side maxConcurrentStreams to force pending opener queuing. + // The server allows many streams, so any failures are from client-side bugs. + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setHttp2MaxConcurrentStreams(3) + .setRequestTimeout(Duration.ofSeconds(10)) + .setMaxConnectionsPerHost(1))) { + + int numRequests = 30; + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference firstError = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + client.prepareGet(httpsUrl("/ok")) + .execute() + .toCompletableFuture() + .whenComplete((response, error) -> { + if (error != null) { + firstError.compareAndSet(null, error); + } else if (response.getStatusCode() == 200) { + successCount.incrementAndGet(); + } + latch.countDown(); + }); + } + + assertTrue(latch.await(30, SECONDS), "All requests should complete within 30s"); + assertNull(firstError.get(), + "No errors expected with low MAX_CONCURRENT_STREAMS, got: " + firstError.get()); + assertEquals(numRequests, successCount.get()); + } + } + + @Test + public void repeatedBurstsWithLowMaxStreamsDoNotLeakSlots() throws Exception { + // Send multiple bursts of sequential requests through a connection with low + // client-side maxConcurrentStreams. Each request exercises the acquire/release + // cycle. Before the fix, the race condition in addPendingOpener could cause + // slot leaks that accumulate across bursts, eventually deadlocking the connection. + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setHttp2MaxConcurrentStreams(3) + .setRequestTimeout(Duration.ofSeconds(10)) + .setMaxConnectionsPerHost(1))) { + + // Send multiple bursts of sequential requests through the same connection. + // With maxConcurrentStreams=3, each request goes through the acquire/release + // cycle. If stream slots leak, later bursts will deadlock. + for (int burst = 0; burst < 5; burst++) { + int burstSize = 15; + for (int i = 0; i < burstSize; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Request #" + i + " in burst #" + burst + " should succeed"); + } + } + } + } + + // ========================================================================= + // GOAWAY REGRESSION: GOAWAY During Active Streams + // + // Scenario: Server sends GOAWAY while streams are active. The client must: + // 1. Not open new streams on the draining connection + // 2. Allow existing streams to complete + // 3. Retry failed streams on a new connection + // 4. Not leak stream slots + // ========================================================================= + + @Test + public void goawayDuringActiveStreamsRecoveryIsClean() throws Exception { + // Start a server that holds streams open. Send requests, then send GOAWAY + // from the server side. Verify new requests succeed on a new connection. + AtomicReference capturedParentChannel = new AtomicReference<>(); + + ServerBootstrap b = new ServerBootstrap() + .group(serverGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + serverChildChannels.add(ch); + capturedParentChannel.set(ch); + ch.pipeline() + .addLast("ssl", serverSslCtx.newHandler(ch.alloc())) + .addLast(Http2FrameCodecBuilder.forServer().build()) + .addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + streamCh.pipeline().addLast(new SimpleOkHandler()); + } + })); + } + }); + + serverChannel = b.bind(0).sync().channel(); + serverPort = ((java.net.InetSocketAddress) serverChannel.localAddress()).getPort(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(5)))) { + + // Establish connection + Response r1 = client.prepareGet(httpsUrl("/ok")).execute().get(10, SECONDS); + assertEquals(200, r1.getStatusCode()); + + // Send GOAWAY from the server to the client's parent connection + Channel parent = capturedParentChannel.get(); + assertNotNull(parent); + parent.writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR)); + + // Brief pause for GOAWAY to propagate + Thread.sleep(300); + + // New requests should succeed — the client should open a new connection + // since the old one is draining + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Request #" + i + " should succeed on new connection after GOAWAY"); + } + } + } + + // ========================================================================= + // COMBINED REGRESSION: The ~0.5% Timeout Scenario + // + // These tests simulate the exact production scenario described in the bug + // report — high concurrency with periodic server disruptions — and verify + // that 0% of requests time out silently. + // ========================================================================= + + @Test + public void zeroPctTimeoutsUnderConcurrencyWithServerDisruptions() throws Exception { + // This is the end-to-end regression test for the reported ~0.5% timeout rate. + // We fire 200 requests across 4 waves, with server restarts between each wave. + // Before the fixes, some requests would silently time out due to: + // - Leaked stream slots (Bug 1) + // - Orphaned futures from attribute overwrite (Bug 2) + // - Missing channelInactive handling (Bug 3) + // - Pending opener deadlocks (Bug 4) + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(10)) + .setMaxConnections(10) + .setMaxConnectionsPerHost(5))) { + + int wavesCount = 4; + int requestsPerWave = 50; + int totalRequests = wavesCount * requestsPerWave; + AtomicInteger totalSuccess = new AtomicInteger(0); + AtomicInteger totalFailed = new AtomicInteger(0); + List errors = new CopyOnWriteArrayList<>(); + + for (int wave = 0; wave < wavesCount; wave++) { + CountDownLatch waveLatch = new CountDownLatch(requestsPerWave); + + for (int i = 0; i < requestsPerWave; i++) { + client.prepareGet(httpsUrl("/ok")) + .execute() + .toCompletableFuture() + .whenComplete((response, error) -> { + if (error != null) { + totalFailed.incrementAndGet(); + errors.add(error); + } else if (response.getStatusCode() == 200) { + totalSuccess.incrementAndGet(); + } + waveLatch.countDown(); + }); + } + + assertTrue(waveLatch.await(30, SECONDS), + "Wave " + wave + " timed out waiting for completion"); + + // Restart the server between waves (except after the last one) + if (wave < wavesCount - 1) { + serverChildChannels.close().sync(); + serverChannel.close().sync(); + Thread.sleep(100); + startSimpleServer(); + } + } + + // No silent timeouts should occur + assertTrue(errors.isEmpty(), + "Expected 0 errors across " + totalRequests + " requests with disruptions, got " + + errors.size() + ": " + (errors.isEmpty() ? "" : errors.get(0).getMessage())); + assertEquals(totalRequests, totalSuccess.get(), + "All requests should succeed (some may have retried)"); + } + } + + @Test + public void clientRecoversAfterRepeatedConnectionDrops() throws Exception { + // After each connection drop, verify the client can still make new requests. + // Before the fixes, leaked stream slots and unhandled channelInactive would + // cause the client to permanently lose the ability to make requests. + startSimpleServer(); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setRequestTimeout(Duration.ofSeconds(5)) + .setMaxConnections(10) + .setMaxConnectionsPerHost(5))) { + + for (int cycle = 0; cycle < 5; cycle++) { + // Send a batch of successful requests + for (int i = 0; i < 10; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Request #" + i + " in cycle #" + cycle + " should succeed"); + } + + // Kill all server connections (simulates network disruption) + serverChildChannels.close().sync(); + Thread.sleep(200); + + // The next request may fail (in-flight or stale connection) — that's OK. + // What matters is the client recovers and subsequent requests succeed. + try { + client.prepareGet(httpsUrl("/ok")) + .execute() + .get(5, SECONDS); + } catch (ExecutionException e) { + // Expected — the connection was just killed + } + + // Client MUST recover: subsequent requests should succeed on new connections + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(10, SECONDS); + assertEquals(200, response.getStatusCode(), + "Recovery request #" + i + " after drop #" + cycle + " should succeed"); + } + } + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/Http2StreamOrphanRegressionTest.java b/client/src/test/java/org/asynchttpclient/Http2StreamOrphanRegressionTest.java new file mode 100644 index 0000000000..a15b2213d6 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/Http2StreamOrphanRegressionTest.java @@ -0,0 +1,374 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.pkitesting.CertificateBuilder; +import io.netty.pkitesting.X509Bundle; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.asynchttpclient.netty.request.NettyRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Regression tests for the RESIDUAL HTTP/2 silent-orphan / blast-radius paths discovered while + * verifying the fix for Issue #2160 — paths NOT covered by {@link Http2MultiplexBugRegressionTest}. + *

+ * Each test FAILS on the first-pass fix (commit that only implemented Http2Handler.handleChannelInactive) + * and PASSES once the follow-up fixes ship: + *

    + *
  1. A request parked in {@code Http2ConnectionState.pendingOpeners} (waiting for a stream slot) + * must be failed when the parent connection drops — otherwise it hangs to the request timeout.
  2. + *
  3. A single stream's RST_STREAM / inactive must NOT close the parent connection and fail its + * sibling multiplexed streams (RFC 7540 §6.4 — RST_STREAM is stream-scoped).
  4. + *
  5. A stream slot acquired in writeHttp2Request must be released if the post-open hooks + * (onRequestSend / sendHttp2Frames) throw — otherwise the slot leaks and wedges the connection.
  6. + *
+ * Each test uses a request timeout LONGER than its {@code get()} timeout so a hang to the request + * timer is observable as a {@link TimeoutException} rather than a fast failure. + */ +public class Http2StreamOrphanRegressionTest { + + private NioEventLoopGroup serverGroup; + private Channel serverChannel; + private ChannelGroup serverChildChannels; + private SslContext serverSslCtx; + private int serverPort; + + @BeforeEach + public void startServer() throws Exception { + X509Bundle bundle = new CertificateBuilder() + .subject("CN=localhost") + .setIsCertificateAuthority(true) + .buildSelfSigned(); + + serverSslCtx = SslContextBuilder.forServer(bundle.toKeyManagerFactory()) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)) + .build(); + + serverGroup = new NioEventLoopGroup(1); + serverChildChannels = new DefaultChannelGroup("h2-orphan-regression", GlobalEventExecutor.INSTANCE); + } + + @AfterEach + public void stopServer() throws InterruptedException { + if (serverChildChannels != null) { + serverChildChannels.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + if (serverGroup != null) { + serverGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + } + ReferenceCountUtil.release(serverSslCtx); + } + + private String httpsUrl(String path) { + return "https://localhost:" + serverPort + path; + } + + private void startServerWithHandler(StreamHandlerFactory factory) throws InterruptedException { + ServerBootstrap b = new ServerBootstrap() + .group(serverGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + serverChildChannels.add(ch); + ch.pipeline() + .addLast("ssl", serverSslCtx.newHandler(ch.alloc())) + .addLast(Http2FrameCodecBuilder.forServer().build()) + .addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + streamCh.pipeline().addLast(factory.create()); + } + })); + } + }); + + serverChannel = b.bind(0).sync().channel(); + serverPort = ((java.net.InetSocketAddress) serverChannel.localAddress()).getPort(); + } + + @FunctionalInterface + private interface StreamHandlerFactory { + SimpleChannelInboundHandler create(); + } + + /** Accepts the request but never responds — holds the stream open. */ + private static class HoldOpenHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + // hold open + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + /** Responds 200 OK with an empty body to every complete request. */ + private static class SimpleOkHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + boolean endStream = (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) + || (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()); + if (endStream) { + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(responseHeaders, true)); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + // ========================================================================= + // BLOCKER 1: pendingOpeners orphaned on connection drop (#2160 recurrence) + // ========================================================================= + @Test + public void queuedRequestsFailWhenConnectionDropsWhileWaitingForStreamSlot() throws Exception { + startServerWithHandler(HoldOpenHandler::new); + + // maxConcurrentStreams=1 => 1 active stream + (burst-1) parked in pendingOpeners. + // Request timeout (30s) >> get() timeout (6s): a hang to the request timer is observable. + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setHttp2MaxConcurrentStreams(1) + .setMaxConnections(1) + .setMaxConnectionsPerHost(1) + .setRequestTimeout(Duration.ofSeconds(30)))) { + + int burst = 6; + List> futures = new ArrayList<>(); + for (int i = 0; i < burst; i++) { + futures.add(client.prepareGet(httpsUrl("/hold")).execute().toCompletableFuture()); + } + + // Let the single active stream establish and the rest queue in pendingOpeners. + Thread.sleep(1000); + + long killTime = System.currentTimeMillis(); + serverChildChannels.close().sync(); + + int orphaned = 0; + for (CompletableFuture f : futures) { + try { + f.get(6, SECONDS); + } catch (ExecutionException e) { + // failed fast = acceptable + } catch (TimeoutException te) { + orphaned++; // hung past 6s -> only the request timer would rescue it = #2160 + } + } + long elapsed = System.currentTimeMillis() - killTime; + + assertEquals(0, orphaned, + "ORPHAN GAP: " + orphaned + "/" + burst + " futures queued in pendingOpeners hung after " + + "connection drop (elapsed " + elapsed + "ms) — they must be failed on parent close."); + } + } + + // ========================================================================= + // BLOCKER 2: a single RST_STREAM must not tear down sibling multiplexed streams + // ========================================================================= + @Test + public void rstStreamDoesNotFailSiblingStreams() throws Exception { + // Server RSTs the FIRST stream to complete its request, holds all others open. + AtomicInteger streamCounter = new AtomicInteger(0); + startServerWithHandler(() -> new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + boolean endStream = (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) + || (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()); + if (endStream && streamCounter.incrementAndGet() == 1) { + ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)); + } + // other streams: hold open + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + }); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setMaxConnectionsPerHost(1) + .setRequestTimeout(Duration.ofSeconds(30)))) { + + int numRequests = 4; + List> futures = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + futures.add(client.prepareGet(httpsUrl("/r" + i)).execute().toCompletableFuture()); + Thread.sleep(50); // stagger so the reset stream is clearly "stream 1" server-side + } + + // Let the RST land; the rest stay held open. + Thread.sleep(1500); + + int stillPending = 0; + for (CompletableFuture f : futures) { + if (!f.isDone()) { + stillPending++; + } + } + + // RFC-correct: exactly the reset stream fails; the other 3 remain open on a live connection. + assertEquals(numRequests - 1, stillPending, + "RST_STREAM BLAST RADIUS: one RST_STREAM closed the whole H2 connection — only " + + stillPending + "/" + numRequests + " sibling streams survived (expected " + + (numRequests - 1) + "). RST_STREAM must be stream-scoped (RFC 7540 §6.4)."); + } + } + + // ========================================================================= + // BLOCKER 3: stream slot must be released if a post-open hook throws + // ========================================================================= + @Test + public void streamSlotReleasedWhenRequestSendHandlerCrashes() throws Exception { + startServerWithHandler(SimpleOkHandler::new); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setHttp2MaxConcurrentStreams(1) + .setMaxConnections(1) + .setMaxConnectionsPerHost(1) + .setRequestTimeout(Duration.ofSeconds(8)))) { + + // Establish the H2 connection. + assertEquals(200, client.prepareGet(httpsUrl("/ok")).execute().get(10, SECONDS).getStatusCode()); + + // A request whose handler throws in onRequestSend — AFTER the single stream slot is + // acquired in writeHttp2Request — drives openHttp2Stream's catch path. Without the fix + // the slot is never released, pinning activeStreams at max=1. + try { + client.prepareGet(httpsUrl("/ok")) + .execute(new AsyncCompletionHandlerBase() { + @Override + public void onRequestSend(NettyRequest request) { + throw new RuntimeException("boom in onRequestSend"); + } + }) + .get(10, SECONDS); + fail("crashing request should have failed"); + } catch (ExecutionException expected) { + // expected — the crashing request fails + } + + // A subsequent normal request must still succeed. With a leaked slot, activeStreams stays + // pinned at 1 and this request queues in pendingOpeners forever, timing out at 8s. + Response r = client.prepareGet(httpsUrl("/ok")).execute().get(8, SECONDS); + assertEquals(200, r.getStatusCode(), + "Stream slot leaked when onRequestSend threw: the connection is wedged and " + + "subsequent requests can never acquire a stream slot."); + } + } + + // ========================================================================= + // SANITY GUARD: the original fix must keep working — established streams fail fast on drop. + // ========================================================================= + @Test + public void establishedStreamsFailFastOnConnectionDrop() throws Exception { + startServerWithHandler(HoldOpenHandler::new); + + try (AsyncHttpClient client = asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true) + .setHttp2MaxConcurrentStreams(1000) // every request becomes an active stream (no queuing) + .setMaxConnectionsPerHost(1) + .setRequestTimeout(Duration.ofSeconds(30)))) { + + int numRequests = 8; + List> futures = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + futures.add(client.prepareGet(httpsUrl("/hold")).execute().toCompletableFuture()); + } + + Thread.sleep(800); + long killTime = System.currentTimeMillis(); + serverChildChannels.close().sync(); + + int orphaned = 0; + for (CompletableFuture f : futures) { + try { + f.get(6, SECONDS); + } catch (ExecutionException e) { + // fast failure ok + } catch (TimeoutException te) { + orphaned++; + } + } + long elapsed = System.currentTimeMillis() - killTime; + assertEquals(0, orphaned, + orphaned + "/" + numRequests + " established-stream futures hung past 6s (elapsed " + + elapsed + "ms) after connection drop."); + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/Http2ConnectionStateTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/Http2ConnectionStateTest.java new file mode 100644 index 0000000000..c905f28e8f --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/Http2ConnectionStateTest.java @@ -0,0 +1,652 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.channel; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for {@link Http2ConnectionState} verifying stream semaphore lifecycle, + * pending opener behavior, draining state, and thread safety under concurrent access. + */ +public class Http2ConnectionStateTest { + + // ------------------------------------------------------------------------- + // Basic tryAcquireStream / releaseStream lifecycle + // ------------------------------------------------------------------------- + + @Test + public void initialActiveStreamsIsZero() { + Http2ConnectionState state = new Http2ConnectionState(); + assertEquals(0, state.getActiveStreams()); + } + + @Test + public void acquireStreamIncrementsCount() { + Http2ConnectionState state = new Http2ConnectionState(); + assertTrue(state.tryAcquireStream()); + assertEquals(1, state.getActiveStreams()); + } + + @Test + public void releaseStreamDecrementsCount() { + Http2ConnectionState state = new Http2ConnectionState(); + state.tryAcquireStream(); + assertEquals(1, state.getActiveStreams()); + + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + } + + @Test + public void acquireAndReleaseMultipleStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(10); + + for (int i = 0; i < 5; i++) { + assertTrue(state.tryAcquireStream()); + } + assertEquals(5, state.getActiveStreams()); + + for (int i = 0; i < 5; i++) { + state.releaseStream(); + } + assertEquals(0, state.getActiveStreams()); + } + + @Test + public void acquireUpToMaxConcurrentStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(3); + + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream(), "Should not acquire beyond maxConcurrentStreams"); + assertEquals(3, state.getActiveStreams()); + } + + @Test + public void acquireFailsAtLimit() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream()); + assertEquals(1, state.getActiveStreams()); + } + + @Test + public void releaseAllowsSubsequentAcquire() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream()); + + state.releaseStream(); + assertTrue(state.tryAcquireStream()); + assertEquals(1, state.getActiveStreams()); + } + + // ------------------------------------------------------------------------- + // Stream semaphore leak simulation (Bug 1 scenario) + // ------------------------------------------------------------------------- + + @Test + public void streamSlotReleasedOnSimulatedOpenFailure() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(2); + + // Acquire a stream slot (simulating tryAcquireStream() in writeHttp2Request) + assertTrue(state.tryAcquireStream()); + assertEquals(1, state.getActiveStreams()); + + // Simulate openHttp2Stream() failure — release must be called explicitly + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + + // Verify connection is not starved — we can still acquire + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertEquals(2, state.getActiveStreams()); + } + + @Test + public void cumulativeLeaksStarveConnection() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(3); + + // Simulate 3 leaked stream slots (acquire without release) + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + + // Connection is completely blocked + assertFalse(state.tryAcquireStream()); + assertEquals(3, state.getActiveStreams()); + } + + @Test + public void releaseAfterLeakedSlotsRecoverConnection() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(2); + + // Simulate 2 leaks + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream()); + + // Fix the leak by releasing both + state.releaseStream(); + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + + // Connection is recovered + assertTrue(state.tryAcquireStream()); + } + + // ------------------------------------------------------------------------- + // Draining state (GOAWAY handling) + // ------------------------------------------------------------------------- + + @Test + public void acquireFailsWhenDraining() { + Http2ConnectionState state = new Http2ConnectionState(); + state.setDraining(100); + + assertTrue(state.isDraining()); + assertFalse(state.tryAcquireStream(), "Should not acquire stream on draining connection"); + } + + @Test + public void drainingPreservesExistingStreamCount() { + Http2ConnectionState state = new Http2ConnectionState(); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertEquals(2, state.getActiveStreams()); + + state.setDraining(10); + assertTrue(state.isDraining()); + assertEquals(2, state.getActiveStreams(), "Existing streams should not be affected by draining"); + + // But new acquisitions should fail + assertFalse(state.tryAcquireStream()); + } + + @Test + public void drainingStoresLastStreamId() { + Http2ConnectionState state = new Http2ConnectionState(); + state.setDraining(42); + + assertEquals(42, state.getLastGoAwayStreamId()); + assertTrue(state.isDraining()); + } + + @Test + public void releaseStreamWhileDraining() { + Http2ConnectionState state = new Http2ConnectionState(); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + + state.setDraining(10); + state.releaseStream(); + assertEquals(1, state.getActiveStreams()); + + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + } + + // ------------------------------------------------------------------------- + // Pending openers (Bug 4 fix validation) + // ------------------------------------------------------------------------- + + @Test + public void addPendingOpenerRunsImmediatelyWhenSlotAvailable() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(5); + + AtomicInteger executionCount = new AtomicInteger(0); + state.addPendingOpener(executionCount::incrementAndGet); + + assertEquals(1, executionCount.get(), "Opener should run immediately when slot available"); + assertEquals(1, state.getActiveStreams(), "Stream should be acquired for the opener"); + } + + @Test + public void addPendingOpenerQueuesWhenNoSlotAvailable() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + // Fill all slots + assertTrue(state.tryAcquireStream()); + assertEquals(1, state.getActiveStreams()); + + AtomicInteger executionCount = new AtomicInteger(0); + state.addPendingOpener(executionCount::incrementAndGet); + + assertEquals(0, executionCount.get(), "Opener should be queued, not executed"); + assertEquals(1, state.getActiveStreams()); + } + + @Test + public void pendingOpenerRunsOnRelease() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + // Fill the single slot + assertTrue(state.tryAcquireStream()); + + AtomicInteger executionCount = new AtomicInteger(0); + state.addPendingOpener(executionCount::incrementAndGet); + assertEquals(0, executionCount.get()); + + // Release the slot — the pending opener should be dequeued and run + state.releaseStream(); + assertEquals(1, executionCount.get(), "Pending opener should have been executed on release"); + } + + @Test + public void multiplePendingOpenersExecuteInOrder() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + // Fill the single slot + assertTrue(state.tryAcquireStream()); + + List executionOrder = Collections.synchronizedList(new ArrayList<>()); + state.addPendingOpener(() -> executionOrder.add(1)); + state.addPendingOpener(() -> executionOrder.add(2)); + state.addPendingOpener(() -> executionOrder.add(3)); + + assertTrue(executionOrder.isEmpty(), "No openers should run yet"); + + // Release slot 1 — first pending opener runs + state.releaseStream(); + assertEquals(1, executionOrder.size()); + assertEquals(1, executionOrder.get(0)); + + // Release slot 2 — second pending opener runs + state.releaseStream(); + assertEquals(2, executionOrder.size()); + assertEquals(2, executionOrder.get(1)); + + // Release slot 3 — third pending opener runs + state.releaseStream(); + assertEquals(3, executionOrder.size()); + assertEquals(3, executionOrder.get(2)); + } + + @Test + public void pendingOpenerDoesNotRunWhenDraining() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(1); + + // Fill the slot + assertTrue(state.tryAcquireStream()); + + AtomicInteger executionCount = new AtomicInteger(0); + state.addPendingOpener(executionCount::incrementAndGet); + + // Start draining before releasing + state.setDraining(10); + + // Release — pending opener should NOT run because draining prevents tryAcquireStream + state.releaseStream(); + assertEquals(0, executionCount.get(), "Pending opener should not run on a draining connection"); + } + + @Test + public void activeStreamCountCorrectWithPendingOpeners() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(2); + + // Acquire 2 slots + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertEquals(2, state.getActiveStreams()); + + // Queue 2 pending openers + AtomicInteger runCount = new AtomicInteger(0); + state.addPendingOpener(runCount::incrementAndGet); + state.addPendingOpener(runCount::incrementAndGet); + assertEquals(0, runCount.get()); + assertEquals(2, state.getActiveStreams()); + + // Release one — pending opener runs and acquires the slot + state.releaseStream(); + assertEquals(1, runCount.get()); + // Active streams: was 2, decremented to 1 by release, then incremented to 2 by pending opener + assertEquals(2, state.getActiveStreams()); + + // Release another — second pending opener runs + state.releaseStream(); + assertEquals(2, runCount.get()); + assertEquals(2, state.getActiveStreams()); + + // Release both remaining + state.releaseStream(); + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + } + + // ------------------------------------------------------------------------- + // MaxConcurrentStreams updates + // ------------------------------------------------------------------------- + + @Test + public void defaultMaxConcurrentStreamsIsMaxValue() { + Http2ConnectionState state = new Http2ConnectionState(); + assertEquals(Integer.MAX_VALUE, state.getMaxConcurrentStreams()); + } + + @Test + public void updateMaxConcurrentStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(100); + assertEquals(100, state.getMaxConcurrentStreams()); + } + + @Test + public void reducingMaxConcurrentStreamsDoesNotAffectExistingStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(10); + + for (int i = 0; i < 5; i++) { + assertTrue(state.tryAcquireStream()); + } + assertEquals(5, state.getActiveStreams()); + + // Reduce limit below current active count + state.updateMaxConcurrentStreams(3); + assertEquals(5, state.getActiveStreams(), "Existing streams should not be killed"); + + // New acquisitions should fail + assertFalse(state.tryAcquireStream()); + } + + @Test + public void increasingMaxConcurrentStreamsAllowsMoreAcquisitions() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(2); + + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream()); + + state.updateMaxConcurrentStreams(5); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertFalse(state.tryAcquireStream()); + assertEquals(5, state.getActiveStreams()); + } + + // ------------------------------------------------------------------------- + // Partition key + // ------------------------------------------------------------------------- + + @Test + public void partitionKeyIsNullByDefault() { + Http2ConnectionState state = new Http2ConnectionState(); + assertNull(state.getPartitionKey()); + } + + @Test + public void partitionKeyCanBeSetAndRetrieved() { + Http2ConnectionState state = new Http2ConnectionState(); + Object key = new Object(); + state.setPartitionKey(key); + assertSame(key, state.getPartitionKey()); + } + + // ------------------------------------------------------------------------- + // Concurrent stress tests (Bug 4 race condition validation) + // ------------------------------------------------------------------------- + + @Test + public void concurrentAcquireAndReleaseNeverExceedsMax() throws InterruptedException { + Http2ConnectionState state = new Http2ConnectionState(); + int maxStreams = 10; + state.updateMaxConcurrentStreams(maxStreams); + + int numThreads = 20; + int iterations = 1000; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + AtomicInteger maxObserved = new AtomicInteger(0); + AtomicInteger errors = new AtomicInteger(0); + + for (int t = 0; t < numThreads; t++) { + executor.submit(() -> { + try { + barrier.await(); + for (int i = 0; i < iterations; i++) { + if (state.tryAcquireStream()) { + int current = state.getActiveStreams(); + maxObserved.updateAndGet(prev -> Math.max(prev, current)); + if (current > maxStreams) { + errors.incrementAndGet(); + } + // Simulate some work + Thread.yield(); + state.releaseStream(); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + assertEquals(0, errors.get(), "activeStreams should never exceed maxConcurrentStreams"); + assertTrue(maxObserved.get() <= maxStreams, + "Max observed (" + maxObserved.get() + ") should not exceed max (" + maxStreams + ")"); + assertEquals(0, state.getActiveStreams(), "All streams should be released"); + } + + @Test + public void concurrentAddPendingOpenerAndReleaseStream() throws InterruptedException { + Http2ConnectionState state = new Http2ConnectionState(); + int maxStreams = 5; + state.updateMaxConcurrentStreams(maxStreams); + + int numThreads = 20; + int totalOpeners = 100; + AtomicInteger executedCount = new AtomicInteger(0); + CountDownLatch allSubmitted = new CountDownLatch(totalOpeners); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + // Fill all slots first + for (int i = 0; i < maxStreams; i++) { + assertTrue(state.tryAcquireStream()); + } + + // Submit pending openers from multiple threads + for (int i = 0; i < totalOpeners; i++) { + executor.submit(() -> { + state.addPendingOpener(() -> { + executedCount.incrementAndGet(); + // Immediately release so next pending opener can run + state.releaseStream(); + }); + allSubmitted.countDown(); + }); + } + + assertTrue(allSubmitted.await(10, TimeUnit.SECONDS), "All openers should be submitted"); + + // Release the initial slots — this should cascade through all pending openers + for (int i = 0; i < maxStreams; i++) { + state.releaseStream(); + } + + // Give some time for cascading execution + Thread.sleep(500); + + assertEquals(totalOpeners, executedCount.get(), + "All pending openers should have been executed"); + assertEquals(0, state.getActiveStreams(), "All streams should be released after cascading"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void concurrentAddPendingOpenerMaintainsStreamCountInvariant() throws InterruptedException { + Http2ConnectionState state = new Http2ConnectionState(); + int maxStreams = 3; + state.updateMaxConcurrentStreams(maxStreams); + + int numThreads = 10; + int iterations = 200; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + AtomicInteger errors = new AtomicInteger(0); + + for (int t = 0; t < numThreads; t++) { + executor.submit(() -> { + try { + barrier.await(); + for (int i = 0; i < iterations; i++) { + CountDownLatch ran = new CountDownLatch(1); + state.addPendingOpener(() -> { + int active = state.getActiveStreams(); + if (active > maxStreams) { + errors.incrementAndGet(); + } + ran.countDown(); + state.releaseStream(); + }); + ran.await(5, TimeUnit.SECONDS); + } + } catch (Exception e) { + errors.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + assertEquals(0, errors.get(), + "activeStreams should never exceed maxConcurrentStreams in pending openers"); + assertEquals(0, state.getActiveStreams()); + } + + @Test + public void activeStreamsNeverGoesNegative() throws InterruptedException { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(100); + + int numThreads = 10; + int iterations = 500; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + AtomicInteger negativeObserved = new AtomicInteger(0); + + for (int t = 0; t < numThreads; t++) { + executor.submit(() -> { + try { + barrier.await(); + for (int i = 0; i < iterations; i++) { + if (state.tryAcquireStream()) { + state.releaseStream(); + int active = state.getActiveStreams(); + if (active < 0) { + negativeObserved.incrementAndGet(); + } + } + } + } catch (Exception e) { + // ignore + } + }); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + assertEquals(0, negativeObserved.get(), "activeStreams should never go negative"); + assertTrue(state.getActiveStreams() >= 0); + } + + // ------------------------------------------------------------------------- + // Edge cases + // ------------------------------------------------------------------------- + + @Test + public void acquireWithZeroMaxConcurrentStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(0); + + assertFalse(state.tryAcquireStream()); + assertEquals(0, state.getActiveStreams()); + } + + @Test + public void addPendingOpenerWithZeroMaxConcurrentStreams() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(0); + + AtomicInteger executionCount = new AtomicInteger(0); + state.addPendingOpener(executionCount::incrementAndGet); + + assertEquals(0, executionCount.get(), "Opener should be queued when max is 0"); + + // Increase the limit and release to trigger + state.updateMaxConcurrentStreams(1); + // Need a releaseStream to drain pending — but first we need to have acquired + // Since we can't release without acquiring, let's acquire and immediately release + assertTrue(state.tryAcquireStream()); + state.releaseStream(); + + assertEquals(1, executionCount.get()); + } + + @Test + public void drainingPreventsNewAcquisitionsButAllowsRelease() { + Http2ConnectionState state = new Http2ConnectionState(); + state.updateMaxConcurrentStreams(5); + + assertTrue(state.tryAcquireStream()); + assertTrue(state.tryAcquireStream()); + assertEquals(2, state.getActiveStreams()); + + state.setDraining(100); + + // Cannot acquire new streams + assertFalse(state.tryAcquireStream()); + assertEquals(2, state.getActiveStreams()); + + // Can still release existing streams + state.releaseStream(); + assertEquals(1, state.getActiveStreams()); + state.releaseStream(); + assertEquals(0, state.getActiveStreams()); + } +}