[
https://issues.apache.org/jira/browse/KAFKA-20393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apoorv Mittal resolved KAFKA-20393.
-----------------------------------
Resolution: Fixed
> Kafka client send to wong IP caused by stickyNode in TelemetrySender
> --------------------------------------------------------------------
>
> Key: KAFKA-20393
> URL: https://issues.apache.org/jira/browse/KAFKA-20393
> Project: Kafka
> Issue Type: Bug
> Components: consumer, network, producer
> Affects Versions: 3.7.0, 3.9.1
> Reporter: Yu Wang
> Assignee: Daeho Kwon
> Priority: Blocker
> Fix For: 4.3.0, 4.2.1
>
>
> In our case, we saw our Kafka consumer connected to a wrong Kafka pod because
> of one Kafka broker was replaced in Kubernetes and its IP was reused by
> another Kafka cluster. (e.g. ClusterA broker 1 was replaced, then the IP of
> this broker used by ClusterB Broker 2)
> After check the heap dump we found {*}a stale telemetry stick node always
> cashed the old IP{*}. This IP polluted the *connectionStates* of
> NetworkClient. Finally, when our consumer tried to get metadata, it fetches
> wrong metadata from the wrong Kafka cluster.
> We think there are two places that cause this issue, both of them are in this
> method:
> [https://github.com/apache/kafka/blob/3.9.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1312]
>
> 1. When check the {*}canSendRequest{*}, it does not directly check the
> stickyNode, it uses the node id to get *connectionStates* and *selector
> channel* then do the check. In our case, at this moment, the there are new IP
> address in these two map and old address in stickyNode. Which makes the check
> pass and keeps the stale IP in stickyNode.
> {code:java}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId, now))
> { .... } {code}
> {code:java}
> private boolean canSendRequest(String node, long now) {
> return connectionStates.isReady(node, now) && selector.isChannelReady(node)
> &&
> inFlightRequests.canSendMore(node);
> } {code}
> 2. When the stickyNode cannot pass the check of {*}canSendRequest{*}, it will
> set the stickyNode to null. But at this time the check of
> *connectionStates.canConnect* will still pass because the new IP is available
> in the connectionStates.
> {code:java}
> if (connectionStates.canConnect(nodeConnectionId, now)) {
> // We don't have a connection to this node right now, make one
> log.debug("Initialize connection to node {} for sending telemetry request",
> node);
> initiateConnect(node, now);
> return reconnectBackoffMs;
> } {code}
> This makes the code will got to {*}initiateConnect{*}. In this method, it
> will use the *host of the stickyNode* to initial the connection. Which
> pollute the host of connectionStates with the stale IP.
> {code:java}
> private void initiateConnect(Node node, long now) {
> String nodeConnectionId = node.idString();
> try {
> connectionStates.connecting(nodeConnectionId, now, node.host());
> InetAddress address = connectionStates.currentAddress(nodeConnectionId);
> log.debug("Initiating connection to node {} using address {}", node,
> address);
> selector.connect(nodeConnectionId,
> new InetSocketAddress(address, node.port()),
> this.socketSendBuffer,
> this.socketReceiveBuffer); {code}
> Finally, when the Kafka client tried to connect the replaced broker again, it
> connected to a wrong IP, even worse, it get wrong metadata from another
> cluster.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)