This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 23c3da56e25 KAFKA-20393: Fix stickyNode using stale IP when broker 
address changes (#21983)
23c3da56e25 is described below

commit 23c3da56e257b9cf5e887ed0491413ac1f11e00d
Author: Daeho Kwon <[email protected]>
AuthorDate: Fri Apr 10 06:16:44 2026 +0900

    KAFKA-20393: Fix stickyNode using stale IP when broker address changes 
(#21983)
    
    When a broker's IP address changes (e.g., pod replacement in
    Kubernetes),
    `TelemetrySender.stickyNode` retains a stale `Node` object with the old
    address. Since `canSendRequest()` checks by node ID only, the stale
    connection passes the check and telemetry data is sent to the wrong
    host.
    
    This PR refreshes `stickyNode` against current metadata at the start of
    `TelemetrySender.maybeUpdate()`. If the node's address has changed,
    `stickyNode` is updated to the fresh `Node`. If the node no longer
    exists
    in metadata, `stickyNode` is cleared and reconnect backoff is returned.
    
    Reviewers: Apoorv Mittal <[email protected]>
    
    ---------
    
    Signed-off-by: Daeho Kwon <[email protected]>
---
 .../org/apache/kafka/clients/NetworkClient.java    | 14 ++++
 .../apache/kafka/clients/NetworkClientTest.java    | 97 ++++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 692847a8b15..2194f22d63c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1393,6 +1393,13 @@ public class NetworkClient implements KafkaClient {
             if (timeToNextUpdate > 0)
                 return timeToNextUpdate;
 
+            // The node connection params can change while having the same 
node id hence check if the cached
+            // sticky node has not changed, if changed then reset the sticky 
node.
+            if (stickyNode != null && isNodeChanged(stickyNode)) {
+                log.debug("Telemetry stickyNode {} either is no longer in 
metadata or changed, clearing it.", stickyNode);
+                stickyNode = null;
+            }
+
             // Per KIP-714, let's continue to re-use the same broker for as 
long as possible.
             if (stickyNode == null) {
                 stickyNode = leastLoadedNode(now).node();
@@ -1442,6 +1449,13 @@ public class NetworkClient implements KafkaClient {
             return Long.MAX_VALUE;
         }
 
+        private boolean isNodeChanged(Node node) {
+            Node newNode = metadataUpdater.fetchNodes().stream()
+                    .filter(n -> n.id() == node.id())
+                    .findFirst().orElse(null);
+            return newNode == null || !newNode.equals(node);
+        }
+
         public void handleResponse(GetTelemetrySubscriptionsResponse response) 
{
             clientTelemetrySender.handleResponse(response);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e8dcf5843dc..5ce40d306b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -1450,6 +1451,102 @@ public class NetworkClientTest {
         }
     }
 
+    @Test
+    public void testStickyNodeDoesNotUseStaleIpOnReconnect() throws 
UnknownHostException {
+        String staleIp = "10.200.20.100";
+        String freshIp  = "10.200.20.200";
+        // Both nodes share the same id to simulate a broker whose IP changed 
(e.g. pod replacement).
+        Node staleNode  = new Node(0, staleIp, 9092);
+        Node freshNode  = new Node(0, freshIp, 9092);
+
+        List<InetSocketAddress> connectAttempts = new ArrayList<>();
+        // boolean array so the anonymous subclass can mutate it
+        boolean[] disconnectDuringPoll = {false};
+
+        // Custom selector that (a) captures every connect address and
+        // (b) can inject a server-side disconnect INSIDE selector.poll(), 
simulating a
+        // disconnect detected after telemetrySender.maybeUpdate() already ran 
with the channel ready.
+        MockSelector capturingSelector = new MockSelector(time) {
+            @Override
+            public void connect(String id, InetSocketAddress address, int 
sendBufferSize, int receiveBufferSize)
+                    throws IOException {
+                connectAttempts.add(address);
+                super.connect(id, address, sendBufferSize, receiveBufferSize);
+            }
+
+            @Override
+            public void poll(long timeout) throws IOException {
+                if (disconnectDuringPoll[0]) {
+                    serverDisconnect("0");
+                    disconnectDuringPoll[0] = false;
+                }
+                super.poll(timeout);
+            }
+        };
+
+        ClientTelemetrySender mockTelemetrySender = 
mock(ClientTelemetrySender.class);
+        when(mockTelemetrySender.timeToNextUpdate(anyLong())).thenReturn(0L);
+        when(mockTelemetrySender.createRequest()).thenReturn(Optional.empty());
+
+        ManualMetadataUpdater updater = new 
ManualMetadataUpdater(Collections.singletonList(staleNode));
+
+        NetworkClient testClient = new NetworkClient(
+                updater, null, capturingSelector, "test-client",
+                Integer.MAX_VALUE,
+                0L, 0L,   // reconnectBackoffMs = 0 for instant reconnect
+                64 * 1024, 64 * 1024,
+                defaultRequestTimeoutMs,
+                connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
+                time, false, new ApiVersions(), null,
+                new LogContext(), new DefaultHostResolver(),
+                mockTelemetrySender, Long.MAX_VALUE,
+                MetadataRecoveryStrategy.NONE);
+
+        long now = time.milliseconds();
+
+        // Poll 1: stickyNode = null → staleNode; canSendRequest = false (not 
yet connected);
+        //         initiateConnect(staleNode); handleConnections → 
connectionStates["0"] = READY
+        testClient.poll(0, now);
+        assertEquals(1, connectAttempts.size());
+        assertEquals(InetAddress.getByName(staleIp), 
connectAttempts.get(0).getAddress());
+        connectAttempts.clear();
+
+        // Poll 2: stickyNode = null → staleNode; canSendRequest = TRUE 
(READY);
+        //         createRequest() = empty → stickyNode = staleNode KEPT (not 
cleared)
+        testClient.poll(0, now);
+        assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll 
2");
+        assertEquals(staleNode, testClient.telemetryConnectedNode());
+
+        // Broker replaced: metadata now points to freshNode (new IP)
+        updater.setNodes(Collections.singletonList(freshNode));
+
+        // Schedule the disconnect to fire INSIDE selector.poll() on the next 
NetworkClient.poll().
+        // This simulates a disconnect detected after 
telemetrySender.maybeUpdate() already ran:
+        //   - telemetrySender.maybeUpdate() runs first → channel still READY 
at this moment
+        //     → fix detects host mismatch → stickyNode updated to freshNode
+        //   - selector.poll() triggers the disconnect
+        //   - handleDisconnections() sets connectionStates["0"] = DISCONNECTED
+        disconnectDuringPoll[0] = true;
+
+        // Poll 3: fix detects that stickyNode's host differs from metadata → 
stickyNode = freshNode;
+        //         selector.poll() fires serverDisconnect("0");
+        //         handleDisconnections → connectionStates["0"] = DISCONNECTED
+        testClient.poll(0, now);
+        assertTrue(connectAttempts.isEmpty(), "No new connect expected in poll 
3");
+        assertEquals(freshNode, testClient.telemetryConnectedNode(),
+                "fix must update stickyNode to freshNode when host mismatch is 
detected");
+
+        // Poll 4: stickyNode = freshNode, connectionStates["0"] = DISCONNECTED
+        //   canSendRequest = false → stickyNode = null
+        //   canConnect = true → initiateConnect with freshNode's IP
+        //   FIXED: connects to freshIp (10.200.20.200)
+        testClient.poll(0, now);
+
+        assertFalse(connectAttempts.isEmpty(), "Expected a reconnect attempt 
in poll 4");
+        assertEquals(InetAddress.getByName(freshIp), 
connectAttempts.get(0).getAddress(),
+                "Reconnect must use the fresh IP from updated metadata, not 
the stale IP from stickyNode");
+    }
+
     // ManualMetadataUpdater with ability to keep track of failures
     private static class TestMetadataUpdater extends ManualMetadataUpdater {
         KafkaException failure;

Reply via email to