This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 23c3da56e25 KAFKA-20393: Fix stickyNode using stale IP when broker
address changes (#21983)
23c3da56e25 is described below
commit 23c3da56e257b9cf5e887ed0491413ac1f11e00d
Author: Daeho Kwon <[email protected]>
AuthorDate: Fri Apr 10 06:16:44 2026 +0900
KAFKA-20393: Fix stickyNode using stale IP when broker address changes
(#21983)
When a broker's IP address changes (e.g., pod replacement in
Kubernetes),
`TelemetrySender.stickyNode` retains a stale `Node` object with the old
address. Since `canSendRequest()` checks by node ID only, the stale
connection passes the check and telemetry data is sent to the wrong
host.
This PR refreshes `stickyNode` against current metadata at the start of
`TelemetrySender.maybeUpdate()`. If the node's address has changed,
`stickyNode` is updated to the fresh `Node`. If the node no longer
exists
in metadata, `stickyNode` is cleared and reconnect backoff is returned.
Reviewers: Apoorv Mittal <[email protected]>
---------
Signed-off-by: Daeho Kwon <[email protected]>
---
.../org/apache/kafka/clients/NetworkClient.java | 14 ++++
.../apache/kafka/clients/NetworkClientTest.java | 97 ++++++++++++++++++++++
2 files changed, 111 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 692847a8b15..2194f22d63c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1393,6 +1393,13 @@ public class NetworkClient implements KafkaClient {
if (timeToNextUpdate > 0)
return timeToNextUpdate;
+ // The node connection params can change while having the same
node id hence check if the cached
+ // sticky node has not changed, if changed then reset the sticky
node.
+ if (stickyNode != null && isNodeChanged(stickyNode)) {
+ log.debug("Telemetry stickyNode {} either is no longer in
metadata or changed, clearing it.", stickyNode);
+ stickyNode = null;
+ }
+
// Per KIP-714, let's continue to re-use the same broker for as
long as possible.
if (stickyNode == null) {
stickyNode = leastLoadedNode(now).node();
@@ -1442,6 +1449,13 @@ public class NetworkClient implements KafkaClient {
return Long.MAX_VALUE;
}
+ private boolean isNodeChanged(Node node) {
+ Node newNode = metadataUpdater.fetchNodes().stream()
+ .filter(n -> n.id() == node.id())
+ .findFirst().orElse(null);
+ return newNode == null || !newNode.equals(node);
+ }
+
public void handleResponse(GetTelemetrySubscriptionsResponse response)
{
clientTelemetrySender.handleResponse(response);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e8dcf5843dc..5ce40d306b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -1450,6 +1451,102 @@ public class NetworkClientTest {
}
}
+ @Test
+ public void testStickyNodeDoesNotUseStaleIpOnReconnect() throws
UnknownHostException {
+ String staleIp = "10.200.20.100";
+ String freshIp = "10.200.20.200";
+ // Both nodes share the same id to simulate a broker whose IP changed
(e.g. pod replacement).
+ Node staleNode = new Node(0, staleIp, 9092);
+ Node freshNode = new Node(0, freshIp, 9092);
+
+ List<InetSocketAddress> connectAttempts = new ArrayList<>();
+ // boolean array so the anonymous subclass can mutate it
+ boolean[] disconnectDuringPoll = {false};
+
+ // Custom selector that (a) captures every connect address and
+ // (b) can inject a server-side disconnect INSIDE selector.poll(),
simulating a
+ // disconnect detected after telemetrySender.maybeUpdate() already ran
with the channel ready.
+ MockSelector capturingSelector = new MockSelector(time) {
+ @Override
+ public void connect(String id, InetSocketAddress address, int
sendBufferSize, int receiveBufferSize)
+ throws IOException {
+ connectAttempts.add(address);
+ super.connect(id, address, sendBufferSize, receiveBufferSize);
+ }
+
+ @Override
+ public void poll(long timeout) throws IOException {
+ if (disconnectDuringPoll[0]) {
+ serverDisconnect("0");
+ disconnectDuringPoll[0] = false;
+ }
+ super.poll(timeout);
+ }
+ };
+
+ ClientTelemetrySender mockTelemetrySender =
mock(ClientTelemetrySender.class);
+ when(mockTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+ when(mockTelemetrySender.createRequest()).thenReturn(Optional.empty());
+
+ ManualMetadataUpdater updater = new
ManualMetadataUpdater(Collections.singletonList(staleNode));
+
+ NetworkClient testClient = new NetworkClient(
+ updater, null, capturingSelector, "test-client",
+ Integer.MAX_VALUE,
+ 0L, 0L, // reconnectBackoffMs = 0 for instant reconnect
+ 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs,
+ connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
+ time, false, new ApiVersions(), null,
+ new LogContext(), new DefaultHostResolver(),
+ mockTelemetrySender, Long.MAX_VALUE,
+ MetadataRecoveryStrategy.NONE);
+
+ long now = time.milliseconds();
+
+ // Poll 1: stickyNode = null → staleNode; canSendRequest = false (not
yet connected);
+ // initiateConnect(staleNode); handleConnections →
connectionStates["0"] = READY
+ testClient.poll(0, now);
+ assertEquals(1, connectAttempts.size());
+ assertEquals(InetAddress.getByName(staleIp),
connectAttempts.get(0).getAddress());
+ connectAttempts.clear();
+
+ // Poll 2: stickyNode = null → staleNode; canSendRequest = TRUE
(READY);
+ // createRequest() = empty → stickyNode = staleNode KEPT (not
cleared)
+ testClient.poll(0, now);
+ assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll
2");
+ assertEquals(staleNode, testClient.telemetryConnectedNode());
+
+ // Broker replaced: metadata now points to freshNode (new IP)
+ updater.setNodes(Collections.singletonList(freshNode));
+
+ // Schedule the disconnect to fire INSIDE selector.poll() on the next
NetworkClient.poll().
+ // This simulates a disconnect detected after
telemetrySender.maybeUpdate() already ran:
+ // - telemetrySender.maybeUpdate() runs first → channel still READY
at this moment
+ // → fix detects host mismatch → stickyNode updated to freshNode
+ // - selector.poll() triggers the disconnect
+ // - handleDisconnections() sets connectionStates["0"] = DISCONNECTED
+ disconnectDuringPoll[0] = true;
+
+ // Poll 3: fix detects that stickyNode's host differs from metadata →
stickyNode = freshNode;
+ // selector.poll() fires serverDisconnect("0");
+ // handleDisconnections → connectionStates["0"] = DISCONNECTED
+ testClient.poll(0, now);
+ assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll
3");
+ assertEquals(freshNode, testClient.telemetryConnectedNode(),
+ "fix must update stickyNode to freshNode when host mismatch is
detected");
+
+ // Poll 4: stickyNode = freshNode, connectionStates["0"] = DISCONNECTED
+ // canSendRequest = false → stickyNode = null
+ // canConnect = true → initiateConnect with freshNode's IP
+ // FIXED: connects to freshIp (10.200.20.200)
+ testClient.poll(0, now);
+
+ assertFalse(connectAttempts.isEmpty(), "Expected a reconnect attempt
in poll 4");
+ assertEquals(InetAddress.getByName(freshIp),
connectAttempts.get(0).getAddress(),
+ "Reconnect must use the fresh IP from updated metadata, not
the stale IP from stickyNode");
+ }
+
// ManualMetadataUpdater with ability to keep track of failures
private static class TestMetadataUpdater extends ManualMetadataUpdater {
KafkaException failure;