This is an automated email from the ASF dual-hosted git repository.

kirktrue pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ede01b871ec KAFKA-20246: Detection and handling of misrouted 
connections [1/N] (#21766)
ede01b871ec is described below

commit ede01b871ec93112e85589b3d70189ab97d06adf
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Apr 2 17:44:26 2026 +0100

    KAFKA-20246: Detection and handling of misrouted connections [1/N] (#21766)
    
    Implementation of
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1242%3A+Detection+and+handling+of+misrouted+connections.
---
 .../java/org/apache/kafka/clients/CommonClientConfigs.java    |  6 ++++++
 .../org/apache/kafka/clients/admin/AdminClientConfig.java     |  8 ++++++++
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java     |  5 +++++
 .../org/apache/kafka/clients/producer/ProducerConfig.java     |  5 +++++
 .../src/main/resources/common/message/ApiVersionsRequest.json | 11 +++++++++--
 .../main/resources/common/message/ApiVersionsResponse.json    |  4 +++-
 .../test/java/org/apache/kafka/clients/NetworkClientTest.java |  4 ++--
 .../kafka/connect/runtime/distributed/DistributedConfig.java  | 10 +++++++++-
 .../unit/kafka/server/DefaultApiVersionManagerTest.scala      | 10 +++++++++-
 .../src/main/java/org/apache/kafka/streams/StreamsConfig.java |  5 +++++
 10 files changed, 61 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 08b861673e3..33b936b378d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -247,6 +247,12 @@ public class CommonClientConfigs {
             "metadata for this interval, client repeats the bootstrap process 
using <code>bootstrap.servers</code> configuration.";
     public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= 300 * 1000;
 
+    public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = 
"metadata.cluster.check.enable";
+    public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = "Whether 
the client should send cluster and node information " +
+            "when connecting to a broker to enable it to check for a misrouted 
connection. This configuration is ignored if " +
+            "rebootstrapping is disabled by setting the configuration 
<code>metadata.recovery.strategy=none</code>. If the client " +
+            "is connecting to a broker older than Apache Kafka 4.4, no 
checking is performed and this configuration has no effect.";
+
     /**
      * Postprocess the configuration so that exponential backoff is disabled 
when reconnect backoff
      * is explicitly configured but the maximum reconnect backoff is not 
explicitly configured.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 471d3916cfb..39086886156 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -146,6 +146,9 @@ public class AdminClientConfig extends AbstractConfig {
     public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = 
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
     public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
 
+    public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = 
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
+    public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = 
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;
+
     /**
      * <code>security.providers</code>
      */
@@ -285,6 +288,11 @@ public class AdminClientConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+                                .define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        METADATA_CLUSTER_CHECK_ENABLE_DOC)
                                 .define(CONFIG_PROVIDERS_CONFIG, 
                                         ConfigDef.Type.LIST,
                                         List.of(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index d8608e3d79d..4dd6002ece5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -689,6 +689,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+                                
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
                                 
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
                                         Type.STRING,
                                         
ShareAcknowledgementMode.IMPLICIT.name(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9a99c5ef08..3ecc9e1e2cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -558,6 +558,11 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+                                
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
                                 .define(CONFIG_PROVIDERS_CONFIG,
                                         ConfigDef.Type.LIST,
                                         List.of(),
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json 
b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 56170c96673..2f13edd531b 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -23,12 +23,19 @@
   // Version 3 is the first flexible version and adds ClientSoftwareName and 
ClientSoftwareVersion.
   //
   // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion 
in the response from being 0.
-  "validVersions": "0-4",
+  //
+  // Version 5 introduces ClusterId and NodeId checking and 
REBOOTSTRAP_REQUIRED error (KIP-1242).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "ClientSoftwareName", "type": "string", "versions": "3+",
       "ignorable": true, "about": "The name of the client." },
     { "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
-      "ignorable": true, "about": "The version of the client." }
+      "ignorable": true, "about": "The version of the client." },
+    { "name": "ClusterId", "type": "string", "versions": "5+", 
"nullableVersions": "5+", "default": "null",
+      "ignorable": true, "about": "The cluster ID the client intends to 
connect to, if known." },
+    { "name": "NodeId", "type": "int32", "versions": "5+", "default": -1,
+      "ignorable": true, "about": "The node ID the client intends to connect 
to, if known." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json 
b/clients/src/main/resources/common/message/ApiVersionsResponse.json
index 1017f244360..722bfa743a6 100644
--- a/clients/src/main/resources/common/message/ApiVersionsResponse.json
+++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json
@@ -29,7 +29,9 @@
   // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is 
returned.
   //
   // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion 
from being 0.
-  "validVersions": "0-4",
+  //
+  // Version 5 introduces ClusterId and NodeId checking and 
REBOOTSTRAP_REQUIRED error (KIP-1242).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e8dcf5843dc..d3f52a0f575 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -463,7 +463,7 @@ public class NetworkClientTest {
         ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
         RequestHeader header = parseHeader(buffer);
         assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
-        assertEquals(4, header.apiVersion());
+        assertEquals(5, header.apiVersion());
 
         // prepare response
         ApiVersionCollection apiKeys = new ApiVersionCollection();
@@ -535,7 +535,7 @@ public class NetworkClientTest {
         ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
         RequestHeader header = parseHeader(buffer);
         assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
-        assertEquals(4, header.apiVersion());
+        assertEquals(5, header.apiVersion());
 
         // prepare response
         delayedApiVersionsResponse(0, (short) 0,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index ccf33926bf9..9ff90e7a516 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -106,6 +106,9 @@ public final class DistributedConfig extends WorkerConfig {
     private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = 
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
     public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
 
+    public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG = 
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
+    private static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = 
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;
+
     /**
      * <code>worker.sync.timeout.ms</code>
      */
@@ -535,7 +538,12 @@ public final class DistributedConfig extends WorkerConfig {
                     DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
                     atLeast(0),
                     ConfigDef.Importance.LOW,
-                    METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+                    METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+            .define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+                    ConfigDef.Type.BOOLEAN,
+                    true,
+                    ConfigDef.Importance.LOW,
+                    METADATA_CLUSTER_CHECK_ENABLE_DOC);
 
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index 6e5c5bd8d1e..e26f035a2d2 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -82,7 +82,15 @@ class DefaultApiVersionManagerTest {
     )
 
     ApiKeys.apisForListener(apiScope).forEach { apiKey =>
-      if (apiKey.messageType.latestVersionUnstable()) {
+      if (apiKey.id == ApiKeys.API_VERSIONS.id) {
+        // ApiVersions API is a particular case. The client always send the 
highest version
+        // that it supports and the server fails back to version 0 if it does 
not know it.
+        // See ApiKeys.isVersionEnabled for more information (KIP-511).
+        // Because API_VERSIONS has an unstable version while KIP-1242 is 
under development,
+        // we need a special case in this test. This assertion will start 
failing when the
+        // API is no longer unstable and the special case can be removed.
+        assertTrue(apiKey.messageType.latestVersionUnstable());
+      } else if (apiKey.messageType.latestVersionUnstable()) {
         assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
           s"$apiKey version ${apiKey.latestVersion} should be disabled.")
       }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2d860cd7905..083ca82e549 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1256,6 +1256,11 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.LOW,
                     
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+            .define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+                    Type.BOOLEAN,
+                    true,
+                    Importance.LOW,
+                    CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
             .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
                     Type.CLASS,
                     null,

Reply via email to