Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
Expand Down Expand Up @@ -353,8 +354,17 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
state.setPartitionKey(partitionKey);
}
http2Connections.put(partitionKey, channel);
// Auto-remove from registry when the connection closes
channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel));
// When the connection closes, remove it from the registry AND fail any requests still queued
// for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
// connection drops have no stream channel (so no channelInactive is ever delivered for them)
// and would survive only until the request timeout fires — the silent-timeout bug of #2160.
channel.closeFuture().addListener(future -> {
removeHttp2Connection(partitionKey, channel);
if (state != null) {
state.failPendingOpeners(orphan ->
orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened")));
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package org.asynchttpclient.netty.channel;

import io.netty.util.AttributeKey;
import org.asynchttpclient.netty.NettyResponseFuture;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
* Tracks per-connection HTTP/2 state: active stream count, max concurrent streams,
Expand All @@ -30,15 +34,31 @@ public class Http2ConnectionState {
public static final AttributeKey<Http2ConnectionState> HTTP2_STATE_KEY =
AttributeKey.valueOf("http2ConnectionState");

/**
* A request waiting for a free stream slot. Carries both the future (so it can be failed if the
* connection dies before a slot frees up) and the action that actually opens the stream.
*/
private static final class PendingOpener {
final NettyResponseFuture<?> future;
final Runnable opener;

PendingOpener(NettyResponseFuture<?> future, Runnable opener) {
this.future = future;
this.opener = opener;
}
}

private final AtomicInteger activeStreams = new AtomicInteger(0);
private volatile int maxConcurrentStreams = Integer.MAX_VALUE;
private final AtomicBoolean draining = new AtomicBoolean(false);
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<PendingOpener> pendingOpeners = new ConcurrentLinkedQueue<>();
private final Object pendingLock = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile Object partitionKey;

public boolean tryAcquireStream() {
if (draining.get()) {
if (draining.get() || closed.get()) {
return false;
}
while (true) {
Expand All @@ -54,22 +74,63 @@ public boolean tryAcquireStream() {

public void releaseStream() {
activeStreams.decrementAndGet();
// Try to dequeue and run a pending opener
Runnable pending = pendingOpeners.poll();
if (pending != null) {
pending.run();
}
drainPendingOpeners();
}

public void addPendingOpener(Runnable opener) {
pendingOpeners.add(opener);
// Re-check in case a stream was released between the failed tryAcquire and this enqueue
if (tryAcquireStream()) {
Runnable dequeued = pendingOpeners.poll();
if (dequeued != null) {
dequeued.run();
addPendingOpener(null, opener);
}

public void addPendingOpener(NettyResponseFuture<?> future, Runnable opener) {
synchronized (pendingLock) {
if (tryAcquireStream()) {
opener.run();
} else {
releaseStream();
pendingOpeners.add(new PendingOpener(future, opener));
}
}
}

private void drainPendingOpeners() {
synchronized (pendingLock) {
PendingOpener pending = pendingOpeners.poll();
if (pending != null) {
if (tryAcquireStream()) {
pending.opener.run();
} else {
// Put it back — another releaseStream() will pick it up
pendingOpeners.offer(pending);
}
}
}
}

/**
* Permanently marks the connection unusable and hands every queued (never-started) request to
* {@code failer}. After this call {@link #tryAcquireStream()} returns {@code false}, so a request
* enqueued concurrently with the close is failed by its own caller's post-enqueue active-channel
* check rather than being silently orphaned.
* <p>
* Without this, requests sitting in {@link #pendingOpeners} when the parent connection drops are
* never completed and survive only until the request timeout fires — the silent-timeout
* regression of Issue #2160 (a queued request has no stream channel, hence no channelInactive
* is ever delivered for it).
*
* @param failer invoked once per orphaned request future (e.g. to abort it)
*/
public void failPendingOpeners(Consumer<NettyResponseFuture<?>> failer) {
closed.set(true);
List<PendingOpener> drained = new ArrayList<>();
synchronized (pendingLock) {
PendingOpener p;
while ((p = pendingOpeners.poll()) != null) {
drained.add(p);
}
}
// Fail outside the lock — failer may re-enter client code.
for (PendingOpener p : drained) {
if (p.future != null) {
failer.accept(p.future);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Cha
*/
private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel, NettyResponseFuture<?> future) {
long errorCode = resetFrame.errorCode();
readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
// RFC 7540 §5.4.2/§6.4: RST_STREAM is stream-scoped and MUST NOT terminate the connection.
// Fail only this stream's future and close only the (single-use) stream child channel —
// sibling streams multiplexed on the same parent connection must be left untouched.
streamFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
}

/**
Expand Down Expand Up @@ -300,11 +303,41 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
}
}

/**
* Fails a single stream's future WITHOUT closing the parent connection. Used for stream-scoped
* events (RST_STREAM, and the {@code channelInactive}/exception Netty delivers when one stream
* dies). {@link #finishUpdate} with {@code close=false} closes only the single-use stream child
* channel and releases its stream slot, leaving the parent connection and its sibling multiplexed
* streams untouched.
* <p>
* When the PARENT connection genuinely drops, Netty fires {@code channelInactive} on every child
* stream, so each in-flight future is still failed individually and promptly.
*/
private void streamFailed(Channel channel, NettyResponseFuture<?> future, Throwable t) {
if (future.isDone()) {
return;
}
try {
requestSender.abort(channel, future, t);
} catch (Exception abortException) {
logger.debug("Abort failed", abortException);
} finally {
finishUpdate(future, channel, false);
}
}

@Override
public void handleException(NettyResponseFuture<?> future, Throwable error) {
// Stream-scoped: an exception on one stream child channel must not tear down the parent
// connection that sibling multiplexed streams share (see streamFailed).
streamFailed(future.channel(), future, error);
}

@Override
public void handleChannelInactive(NettyResponseFuture<?> future) {
// Stream-scoped (see streamFailed): closing the parent here would fail unrelated sibling
// streams on the same connection — the RST_STREAM/single-stream-close blast-radius bug.
streamFailed(future.channel(), future,
new IOException("HTTP/2 stream channel closed unexpectedly"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,14 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T

// channelInactive might be called between isChannelValid and writeRequest
// so if we don't store the Future now, channelInactive won't perform
// handleUnexpectedClosedChannel
Channels.setAttribute(channel, future);
// handleUnexpectedClosedChannel.
// For HTTP/2, skip this: the parent connection multiplexes many concurrent requests, so a
// single per-request Future on the parent channel is meaningless — each request's Future is
// stored on its own stream child channel in openHttp2Stream(). The parent also has no
// AsyncHttpClientHandler after the H2 upgrade, so nothing reads this attribute anyway.
if (!ChannelManager.isHttp2(channel)) {
Channels.setAttribute(channel, future);
}

if (Channels.isChannelActive(channel)) {
writeRequest(future, channel);
Expand Down Expand Up @@ -485,7 +491,6 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
*/
private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parentChannel) {
Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
Runnable openStream = () -> openHttp2Stream(future, parentChannel);

if (state != null && !state.tryAcquireStream()) {
if (state.isDraining()) {
Expand All @@ -495,13 +500,27 @@ private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parent
return;
}
// Queue for later when a stream slot opens up
state.addPendingOpener(openStream);
state.addPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state));
// The parent connection may have closed concurrently with the enqueue above; if so its
// close listener already drained the queue and this future would be orphaned (it has no
// stream channel, so no channelInactive is ever delivered for it). Detect that race and
// fail it here so it never survives only to the request timeout (Issue #2160).
if (!parentChannel.isActive() && !future.isDone()) {
abort(parentChannel, future,
new java.io.IOException("HTTP/2 connection closed while request was queued"));
}
return;
}
openStream.run();
openHttp2Stream(future, parentChannel, state);
}

private static void releaseHttp2Stream(Http2ConnectionState state) {
if (state != null) {
state.releaseStream();
}
}

private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel) {
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel, Http2ConnectionState state) {
new Http2StreamChannelBootstrap(parentChannel)
.handler(new ChannelInitializer<Http2StreamChannel>() {
@Override
Expand All @@ -519,13 +538,19 @@ protected void initChannel(Http2StreamChannel streamCh) {
Http2StreamChannel streamChannel = f.getNow();
channelManager.registerOpenChannel(streamChannel);
Channels.setAttribute(streamChannel, future);
Channels.setActiveToken(streamChannel);
future.attachChannel(streamChannel, false);
try {
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
try {
asyncHandler.onRequestSend(future.getNettyRequest());
} catch (Exception e) {
LOGGER.error("onRequestSend crashed", e);
// The slot was acquired before open(); aborting here completes the
// future, so the stream's later channelInactive won't run finishUpdate
// (its !future.isDone() guard fails). Release the slot explicitly,
// otherwise activeStreams leaks and eventually wedges the connection.
releaseHttp2Stream(state);
abort(streamChannel, future, e);
return;
}
Expand All @@ -538,9 +563,15 @@ protected void initChannel(Http2StreamChannel streamCh) {
scheduleReadTimeout(future);
} catch (Exception e) {
LOGGER.error("Can't write HTTP/2 request", e);
// See above: release the slot the failed stream will never release itself.
releaseHttp2Stream(state);
abort(streamChannel, future, e);
}
} else {
// Stream channel was never opened — release the acquired stream slot
if (state != null) {
state.releaseStream();
}
abort(parentChannel, future, f.cause());
}
});
Expand Down
Loading
Loading