Yu Wang created KAFKA-20393:
-------------------------------
Summary: Kafka client send to wong IP caused by stickyNode in
TelemetrySender
Key: KAFKA-20393
URL: https://issues.apache.org/jira/browse/KAFKA-20393
Project: Kafka
Issue Type: Bug
Components: consumer, producer
Affects Versions: 3.9.1, 3.7.0
Reporter: Yu Wang
In our case, we saw our Kafka consumer connected to a wrong Kafka pod because
of IP reused by another Kafka cluster. (e.g. ClusterA broker 1 was replaced,
then the IP of this broker used by ClusterB Broker 2)
After check the heap dump we found a stale telemetry stick node always cashed
the old IP. This IP polluted the *connectionStates* of NetworkClient. Finally,
when our consumer tried to get metadata, it fetches wrong metadata from the
wrong Kafka cluster.
We think there are two places that cause this issue together:
1. When check the {*}canSendRequest{*}, it does not directly check the
stickyNode, it uses the node id to get *connectionStates* and *selector
channel* then do the check. In our case, at this moment, the there are new IP
address in these two map and old address in stickyNode. Which makes the check
pass and keeps the stale IP in stickyNode.
{color:#cf8e6d}private long
{color}{color:#56a8f5}maybeUpdate{color}({color:#cf8e6d}long {color}now, Node
node) {
String nodeConnectionId = node.idString();
{color:#cf8e6d}if {color}(canSendRequest(nodeConnectionId, now)) {
....
}
{color:#cf8e6d}private boolean
{color}{color:#56a8f5}canSendRequest{color}(String node, {color:#cf8e6d}long
{color}now) {
{color:#cf8e6d}return
{color}{color:#c77dbb}connectionStates{color}.isReady(node, now) &&
{color:#c77dbb}selector{color}.isChannelReady(node) &&
{color:#c77dbb}inFlightRequests{color}.canSendMore(node);
}
2. When the stickyNode cannot pass the check of {*}canSendRequest{*}, it will
set the stickyNode to null. But at this time the check of
*connectionStates.canConnect* will still pass because the new IP is available
in the connectionStates.
{color:#cf8e6d}if
{color}({color:#c77dbb}connectionStates{color}.canConnect(nodeConnectionId,
now)) {
{color:#7a7e85}// We don't have a connection to this node right now, make one
{color}{color:#7a7e85}
{color}{color:#c77dbb}log{color}.debug({color:#6aab73}"Initialize connection to
node {color}{color:#cf8e6d}{}{color}{color:#6aab73} for sending telemetry
request"{color}, node);
initiateConnect(node, now);
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};
}
This makes the code will got to {*}initiateConnect{*}. In this method, it will
use the *host of the stickyNode* to initial the connection. Which pollute the
host of connectionStates with the tale IP.
{color:#cf8e6d}private void {color}{color:#56a8f5}initiateConnect{color}(Node
node, {color:#cf8e6d}long {color}now) {
String nodeConnectionId = node.idString();
{color:#cf8e6d}try {color}{
{color:#c77dbb}connectionStates{color}.connecting(nodeConnectionId, now,
node.host());
InetAddress address =
{color:#c77dbb}connectionStates{color}.currentAddress(nodeConnectionId);
{color:#c77dbb}log{color}.debug({color:#6aab73}"Initiating connection to node
{color}{color:#cf8e6d}{}{color}{color:#6aab73} using address
{color}{color:#cf8e6d}{}{color}{color:#6aab73}"{color}, node, address);
{color:#c77dbb}selector{color}.connect(nodeConnectionId,
{color:#cf8e6d}new {color}InetSocketAddress(address, node.port()),
{color:#cf8e6d}this{color}.{color:#c77dbb}socketSendBuffer{color},
{color:#cf8e6d}this{color}.{color:#c77dbb}socketReceiveBuffer{color});
{color:#cf8e6d}private long
{color}{color:#56a8f5}maybeUpdate{color}({color:#cf8e6d}long {color}now, Node
node) {
String nodeConnectionId = node.idString();
{color:#cf8e6d}if {color}(canSendRequest(nodeConnectionId, now)) {
Optional<AbstractRequest.Builder<?>> requestOpt =
{color:#c77dbb}clientTelemetrySender{color}.createRequest();
{color:#cf8e6d}if {color}(!requestOpt.isPresent())
{color:#cf8e6d}return {color}Long.{color:#c77dbb}MAX_VALUE{color};
AbstractRequest.Builder<?> request = requestOpt.get();
ClientRequest clientRequest = newClientRequest(nodeConnectionId, request, now,
{color:#cf8e6d}true{color});
doSend(clientRequest, {color:#cf8e6d}true{color}, now);
{color:#cf8e6d}return {color}{color:#c77dbb}defaultRequestTimeoutMs{color};
} {color:#cf8e6d}else {color}{
{color:#7a7e85}// Per KIP-714, if we can't issue a request to this broker node,
let's clear it out
{color}{color:#7a7e85} // and try another broker on the next loop.
{color}{color:#7a7e85} {color}{color:#c77dbb}stickyNode {color}=
{color:#cf8e6d}null{color};
}
{color:#7a7e85}// If there's any connection establishment underway, wait until
it completes. This prevents
{color}{color:#7a7e85} // the client from unnecessarily connecting to
additional nodes while a previous connection
{color}{color:#7a7e85} // attempt has not been completed.
{color}{color:#7a7e85} {color}{color:#cf8e6d}if {color}(isAnyNodeConnecting())
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};
{color:#cf8e6d}if
{color}({color:#c77dbb}connectionStates{color}.canConnect(nodeConnectionId,
now)) {
{color:#7a7e85}// We don't have a connection to this node right now, make one
{color}{color:#7a7e85}
{color}{color:#c77dbb}log{color}.debug({color:#6aab73}"Initialize connection to
node {color}{color:#cf8e6d}{}{color}{color:#6aab73} for sending telemetry
request"{color}, node);
initiateConnect(node, now);
{color:#cf8e6d}return {color}{color:#c77dbb}reconnectBackoffMs{color};
}
{color:#7a7e85}// In either case, we just need to wait for a network event to
let us know the selected
{color}{color:#7a7e85} // connection might be usable again.
{color}{color:#7a7e85} {color}{color:#cf8e6d}return
{color}Long.{color:#c77dbb}MAX_VALUE{color};
}
Finally, when the Kafka client tried to connect the replaced broker again, it
connected to a wrong IP, even worse, it get wrong metadata from another cluster.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)