Skip to content

Commit 123ee9e

Browse files
authored
KAFKA-20393: Fix stickyNode using stale IP when broker address changes (#21983)
When a broker's IP address changes (e.g., pod replacement in Kubernetes), `TelemetrySender.stickyNode` retains a stale `Node` object with the old address. Since `canSendRequest()` checks by node ID only, the stale connection passes the check and telemetry data is sent to the wrong host. This PR refreshes `stickyNode` against current metadata at the start of `TelemetrySender.maybeUpdate()`. If the node's address has changed, `stickyNode` is updated to the fresh `Node`. If the node no longer exists in metadata, `stickyNode` is cleared and reconnect backoff is returned. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com> --------- Signed-off-by: Daeho Kwon <trewq231@naver.com>
1 parent e4cd243 commit 123ee9e

2 files changed

Lines changed: 111 additions & 0 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,6 +1393,13 @@ public long maybeUpdate(long now) {
13931393
if (timeToNextUpdate > 0)
13941394
return timeToNextUpdate;
13951395

1396+
// The node connection params can change while having the same node id hence check if the cached
1397+
// sticky node has not changed, if changed then reset the sticky node.
1398+
if (stickyNode != null && isNodeChanged(stickyNode)) {
1399+
log.debug("Telemetry stickyNode {} either is no longer in metadata or changed, clearing it.", stickyNode);
1400+
stickyNode = null;
1401+
}
1402+
13961403
// Per KIP-714, let's continue to re-use the same broker for as long as possible.
13971404
if (stickyNode == null) {
13981405
stickyNode = leastLoadedNode(now).node();
@@ -1442,6 +1449,13 @@ private long maybeUpdate(long now, Node node) {
14421449
return Long.MAX_VALUE;
14431450
}
14441451

1452+
private boolean isNodeChanged(Node node) {
1453+
Node newNode = metadataUpdater.fetchNodes().stream()
1454+
.filter(n -> n.id() == node.id())
1455+
.findFirst().orElse(null);
1456+
return newNode == null || !newNode.equals(node);
1457+
}
1458+
14451459
public void handleResponse(GetTelemetrySubscriptionsResponse response) {
14461460
clientTelemetrySender.handleResponse(response);
14471461
}

clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.junit.jupiter.api.BeforeEach;
6060
import org.junit.jupiter.api.Test;
6161

62+
import java.io.IOException;
6263
import java.net.InetAddress;
6364
import java.net.InetSocketAddress;
6465
import java.net.UnknownHostException;
@@ -1450,6 +1451,102 @@ public void onComplete(ClientResponse response) {
14501451
}
14511452
}
14521453

1454+
@Test
1455+
public void testStickyNodeDoesNotUseStaleIpOnReconnect() throws UnknownHostException {
1456+
String staleIp = "10.200.20.100";
1457+
String freshIp = "10.200.20.200";
1458+
// Both nodes share the same id to simulate a broker whose IP changed (e.g. pod replacement).
1459+
Node staleNode = new Node(0, staleIp, 9092);
1460+
Node freshNode = new Node(0, freshIp, 9092);
1461+
1462+
List<InetSocketAddress> connectAttempts = new ArrayList<>();
1463+
// boolean array so the anonymous subclass can mutate it
1464+
boolean[] disconnectDuringPoll = {false};
1465+
1466+
// Custom selector that (a) captures every connect address and
1467+
// (b) can inject a server-side disconnect INSIDE selector.poll(), simulating a
1468+
// disconnect detected after telemetrySender.maybeUpdate() already ran with the channel ready.
1469+
MockSelector capturingSelector = new MockSelector(time) {
1470+
@Override
1471+
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
1472+
throws IOException {
1473+
connectAttempts.add(address);
1474+
super.connect(id, address, sendBufferSize, receiveBufferSize);
1475+
}
1476+
1477+
@Override
1478+
public void poll(long timeout) throws IOException {
1479+
if (disconnectDuringPoll[0]) {
1480+
serverDisconnect("0");
1481+
disconnectDuringPoll[0] = false;
1482+
}
1483+
super.poll(timeout);
1484+
}
1485+
};
1486+
1487+
ClientTelemetrySender mockTelemetrySender = mock(ClientTelemetrySender.class);
1488+
when(mockTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
1489+
when(mockTelemetrySender.createRequest()).thenReturn(Optional.empty());
1490+
1491+
ManualMetadataUpdater updater = new ManualMetadataUpdater(Collections.singletonList(staleNode));
1492+
1493+
NetworkClient testClient = new NetworkClient(
1494+
updater, null, capturingSelector, "test-client",
1495+
Integer.MAX_VALUE,
1496+
0L, 0L, // reconnectBackoffMs = 0 for instant reconnect
1497+
64 * 1024, 64 * 1024,
1498+
defaultRequestTimeoutMs,
1499+
connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
1500+
time, false, new ApiVersions(), null,
1501+
new LogContext(), new DefaultHostResolver(),
1502+
mockTelemetrySender, Long.MAX_VALUE,
1503+
MetadataRecoveryStrategy.NONE);
1504+
1505+
long now = time.milliseconds();
1506+
1507+
// Poll 1: stickyNode = null → staleNode; canSendRequest = false (not yet connected);
1508+
// initiateConnect(staleNode); handleConnections → connectionStates["0"] = READY
1509+
testClient.poll(0, now);
1510+
assertEquals(1, connectAttempts.size());
1511+
assertEquals(InetAddress.getByName(staleIp), connectAttempts.get(0).getAddress());
1512+
connectAttempts.clear();
1513+
1514+
// Poll 2: stickyNode = null → staleNode; canSendRequest = TRUE (READY);
1515+
// createRequest() = empty → stickyNode = staleNode KEPT (not cleared)
1516+
testClient.poll(0, now);
1517+
assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll 2");
1518+
assertEquals(staleNode, testClient.telemetryConnectedNode());
1519+
1520+
// Broker replaced: metadata now points to freshNode (new IP)
1521+
updater.setNodes(Collections.singletonList(freshNode));
1522+
1523+
// Schedule the disconnect to fire INSIDE selector.poll() on the next NetworkClient.poll().
1524+
// This simulates a disconnect detected after telemetrySender.maybeUpdate() already ran:
1525+
// - telemetrySender.maybeUpdate() runs first → channel still READY at this moment
1526+
// → fix detects host mismatch → stickyNode updated to freshNode
1527+
// - selector.poll() triggers the disconnect
1528+
// - handleDisconnections() sets connectionStates["0"] = DISCONNECTED
1529+
disconnectDuringPoll[0] = true;
1530+
1531+
// Poll 3: fix detects that stickyNode's host differs from metadata → stickyNode = freshNode;
1532+
// selector.poll() fires serverDisconnect("0");
1533+
// handleDisconnections → connectionStates["0"] = DISCONNECTED
1534+
testClient.poll(0, now);
1535+
assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll 3");
1536+
assertEquals(freshNode, testClient.telemetryConnectedNode(),
1537+
"fix must update stickyNode to freshNode when host mismatch is detected");
1538+
1539+
// Poll 4: stickyNode = freshNode, connectionStates["0"] = DISCONNECTED
1540+
// canSendRequest = false → stickyNode = null
1541+
// canConnect = true → initiateConnect with freshNode's IP
1542+
// FIXED: connects to freshIp (10.200.20.200)
1543+
testClient.poll(0, now);
1544+
1545+
assertFalse(connectAttempts.isEmpty(), "Expected a reconnect attempt in poll 4");
1546+
assertEquals(InetAddress.getByName(freshIp), connectAttempts.get(0).getAddress(),
1547+
"Reconnect must use the fresh IP from updated metadata, not the stale IP from stickyNode");
1548+
}
1549+
14531550
// ManualMetadataUpdater with ability to keep track of failures
14541551
private static class TestMetadataUpdater extends ManualMetadataUpdater {
14551552
KafkaException failure;

0 commit comments

Comments
 (0)