[ 
https://issues.apache.org/jira/browse/KAFKA-20393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-20393.
-----------------------------------
    Resolution: Fixed

> Kafka client send to wong IP caused by stickyNode in TelemetrySender
> --------------------------------------------------------------------
>
>                 Key: KAFKA-20393
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20393
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, network, producer 
>    Affects Versions: 3.7.0, 3.9.1
>            Reporter: Yu Wang
>            Assignee: Daeho Kwon
>            Priority: Blocker
>             Fix For: 4.3.0, 4.2.1
>
>
> In our case, we saw our Kafka consumer connected to a wrong Kafka pod because 
> of one Kafka broker was replaced in Kubernetes and its IP was reused by 
> another Kafka cluster. (e.g. ClusterA broker 1 was replaced, then the IP of 
> this broker used by ClusterB Broker 2)
> After check the heap dump we found {*}a stale telemetry stick node always 
> cashed the old IP{*}. This IP polluted the *connectionStates* of 
> NetworkClient. Finally, when our consumer tried to get metadata, it fetches 
> wrong metadata from the wrong Kafka cluster.
> We think there are two places that cause this issue, both of them are in this 
> method: 
> [https://github.com/apache/kafka/blob/3.9.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1312]
>  
> 1. When check the {*}canSendRequest{*}, it does not directly check the 
> stickyNode, it uses the node id to get *connectionStates* and *selector 
> channel* then do the check. In our case, at this moment, the there are new IP 
> address in these two map and old address in stickyNode. Which makes the check 
> pass and keeps the stale IP in stickyNode.
> {code:java}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId, now))
> { .... } {code}
> {code:java}
> private boolean canSendRequest(String node, long now) {
>   return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> &&
>   inFlightRequests.canSendMore(node);
> } {code}
> 2. When the stickyNode cannot pass the check of {*}canSendRequest{*}, it will 
> set the stickyNode to null. But at this time the check of 
> *connectionStates.canConnect* will still pass because the new IP is available 
> in the connectionStates.
> {code:java}
> if (connectionStates.canConnect(nodeConnectionId, now)) {
>   // We don't have a connection to this node right now, make one 
> log.debug("Initialize connection to node {} for sending telemetry request", 
> node);
>   initiateConnect(node, now);
>   return reconnectBackoffMs;
> } {code}
> This makes the code will got to {*}initiateConnect{*}. In this method, it 
> will use the *host of the stickyNode* to initial the connection. Which 
> pollute the host of connectionStates with the stale IP.
> {code:java}
> private void initiateConnect(Node node, long now) {
>   String nodeConnectionId = node.idString();
>   try {
>     connectionStates.connecting(nodeConnectionId, now, node.host());
>     InetAddress address = connectionStates.currentAddress(nodeConnectionId);
>     log.debug("Initiating connection to node {} using address {}", node, 
> address);
>     selector.connect(nodeConnectionId,
>     new InetSocketAddress(address, node.port()),
>     this.socketSendBuffer,
>     this.socketReceiveBuffer); {code}
> Finally, when the Kafka client tried to connect the replaced broker again, it 
> connected to a wrong IP, even worse, it get wrong metadata from another 
> cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to