Yu Wang created KAFKA-20393:
-------------------------------

             Summary: Kafka client send to wong IP caused by stickyNode in 
TelemetrySender
                 Key: KAFKA-20393
                 URL: https://issues.apache.org/jira/browse/KAFKA-20393
             Project: Kafka
          Issue Type: Bug
          Components: consumer, producer 
    Affects Versions: 3.9.1, 3.7.0
            Reporter: Yu Wang


In our case, we saw our Kafka consumer connected to a wrong Kafka pod because 
of IP reused by another Kafka cluster. (e.g. ClusterA broker 1 was replaced, 
then the IP of this broker used by ClusterB Broker 2)

After check the heap dump we found a stale telemetry stick node always cashed 
the old IP. This IP polluted the *connectionStates* of NetworkClient. Finally, 
when our consumer tried to get metadata, it fetches wrong metadata from the 
wrong Kafka cluster.

We think there are two places that cause this issue together:

1. When check the {*}canSendRequest{*}, it does not directly check the 
stickyNode, it uses the node id to get *connectionStates* and *selector 
channel* then do the check. In our case, at this moment, the there are new IP 
address in these two map and old address in stickyNode. Which makes the check 
pass and keeps the stale IP in stickyNode.
{color:#cf8e6d}private long 
{color}{color:#56a8f5}maybeUpdate{color}({color:#cf8e6d}long {color}now, Node 
node) {
String nodeConnectionId = node.idString();

{color:#cf8e6d}if {color}(canSendRequest(nodeConnectionId, now)) {
....
}
 
{color:#cf8e6d}private boolean 
{color}{color:#56a8f5}canSendRequest{color}(String node, {color:#cf8e6d}long 
{color}now) {
{color:#cf8e6d}return 
{color}{color:#c77dbb}connectionStates{color}.isReady(node, now) && 
{color:#c77dbb}selector{color}.isChannelReady(node) &&
{color:#c77dbb}inFlightRequests{color}.canSendMore(node);
}
2. When the stickyNode cannot pass the check of {*}canSendRequest{*}, it will 
set the stickyNode to null. But at this time the check of 
*connectionStates.canConnect* will still pass because the new IP is available 
in the connectionStates.
{color:#cf8e6d}if 
{color}({color:#c77dbb}connectionStates{color}.canConnect(nodeConnectionId, 
now)) {
{color:#7a7e85}// We don't have a connection to this node right now, make one
{color}{color:#7a7e85} 
{color}{color:#c77dbb}log{color}.debug({color:#6aab73}"Initialize connection to 
node {color}{color:#cf8e6d}{}{color}{color:#6aab73} for sending telemetry 
request"{color}, node);
initiateConnect(node, now);
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};
}
This makes the code will got to {*}initiateConnect{*}. In this method, it will 
use the *host of the stickyNode* to initial the connection. Which pollute the 
host of connectionStates with the tale IP.
{color:#cf8e6d}private void {color}{color:#56a8f5}initiateConnect{color}(Node 
node, {color:#cf8e6d}long {color}now) {
String nodeConnectionId = node.idString();
{color:#cf8e6d}try {color}{
{color:#c77dbb}connectionStates{color}.connecting(nodeConnectionId, now, 
node.host());
InetAddress address = 
{color:#c77dbb}connectionStates{color}.currentAddress(nodeConnectionId);
{color:#c77dbb}log{color}.debug({color:#6aab73}"Initiating connection to node 
{color}{color:#cf8e6d}{}{color}{color:#6aab73} using address 
{color}{color:#cf8e6d}{}{color}{color:#6aab73}"{color}, node, address);
{color:#c77dbb}selector{color}.connect(nodeConnectionId,
{color:#cf8e6d}new {color}InetSocketAddress(address, node.port()),
{color:#cf8e6d}this{color}.{color:#c77dbb}socketSendBuffer{color},
{color:#cf8e6d}this{color}.{color:#c77dbb}socketReceiveBuffer{color});
 
{color:#cf8e6d}private long 
{color}{color:#56a8f5}maybeUpdate{color}({color:#cf8e6d}long {color}now, Node 
node) {
String nodeConnectionId = node.idString();

{color:#cf8e6d}if {color}(canSendRequest(nodeConnectionId, now)) {
Optional<AbstractRequest.Builder<?>> requestOpt = 
{color:#c77dbb}clientTelemetrySender{color}.createRequest();

{color:#cf8e6d}if {color}(!requestOpt.isPresent())
{color:#cf8e6d}return {color}Long.{color:#c77dbb}MAX_VALUE{color};

AbstractRequest.Builder<?> request = requestOpt.get();
ClientRequest clientRequest = newClientRequest(nodeConnectionId, request, now, 
{color:#cf8e6d}true{color});
doSend(clientRequest, {color:#cf8e6d}true{color}, now);
{color:#cf8e6d}return {color}{color:#c77dbb}defaultRequestTimeoutMs{color};
} {color:#cf8e6d}else {color}{
{color:#7a7e85}// Per KIP-714, if we can't issue a request to this broker node, 
let's clear it out
{color}{color:#7a7e85} // and try another broker on the next loop.
{color}{color:#7a7e85} {color}{color:#c77dbb}stickyNode {color}= 
{color:#cf8e6d}null{color};
}

{color:#7a7e85}// If there's any connection establishment underway, wait until 
it completes. This prevents
{color}{color:#7a7e85} // the client from unnecessarily connecting to 
additional nodes while a previous connection
{color}{color:#7a7e85} // attempt has not been completed.
{color}{color:#7a7e85} {color}{color:#cf8e6d}if {color}(isAnyNodeConnecting())
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};

{color:#cf8e6d}if 
{color}({color:#c77dbb}connectionStates{color}.canConnect(nodeConnectionId, 
now)) {
{color:#7a7e85}// We don't have a connection to this node right now, make one
{color}{color:#7a7e85} 
{color}{color:#c77dbb}log{color}.debug({color:#6aab73}"Initialize connection to 
node {color}{color:#cf8e6d}{}{color}{color:#6aab73} for sending telemetry 
request"{color}, node);
initiateConnect(node, now);
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};
}

{color:#7a7e85}// In either case, we just need to wait for a network event to 
let us know the selected
{color}{color:#7a7e85} // connection might be usable again.
{color}{color:#7a7e85} {color}{color:#cf8e6d}return 
{color}Long.{color:#c77dbb}MAX_VALUE{color};
}
Finally, when the Kafka client tried to connect the replaced broker again, it 
connected to a wrong IP, even worse, it get wrong metadata from another cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to