This is an automated email from the ASF dual-hosted git repository.
kirktrue pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ede01b871ec KAFKA-20246: Detection and handling of misrouted
connections [1/N] (#21766)
ede01b871ec is described below
commit ede01b871ec93112e85589b3d70189ab97d06adf
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Apr 2 17:44:26 2026 +0100
KAFKA-20246: Detection and handling of misrouted connections [1/N] (#21766)
Implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1242%3A+Detection+and+handling+of+misrouted+connections.
---
.../java/org/apache/kafka/clients/CommonClientConfigs.java | 6 ++++++
.../org/apache/kafka/clients/admin/AdminClientConfig.java | 8 ++++++++
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 5 +++++
.../org/apache/kafka/clients/producer/ProducerConfig.java | 5 +++++
.../src/main/resources/common/message/ApiVersionsRequest.json | 11 +++++++++--
.../main/resources/common/message/ApiVersionsResponse.json | 4 +++-
.../test/java/org/apache/kafka/clients/NetworkClientTest.java | 4 ++--
.../kafka/connect/runtime/distributed/DistributedConfig.java | 10 +++++++++-
.../unit/kafka/server/DefaultApiVersionManagerTest.scala | 10 +++++++++-
.../src/main/java/org/apache/kafka/streams/StreamsConfig.java | 5 +++++
10 files changed, 61 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 08b861673e3..33b936b378d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -247,6 +247,12 @@ public class CommonClientConfigs {
"metadata for this interval, client repeats the bootstrap process
using <code>bootstrap.servers</code> configuration.";
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= 300 * 1000;
+ public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG =
"metadata.cluster.check.enable";
+ public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC = "Whether
the client should send cluster and node information " +
+ "when connecting to a broker to enable it to check for a misrouted
connection. This configuration is ignored if " +
+ "rebootstrapping is disabled by setting the configuration
<code>metadata.recovery.strategy=none</code>. If the client " +
+ "is connecting to a broker older than Apache Kafka 4.4, no
checking is performed and this configuration has no effect.";
+
/**
* Postprocess the configuration so that exponential backoff is disabled
when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not
explicitly configured.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 471d3916cfb..39086886156 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -146,6 +146,9 @@ public class AdminClientConfig extends AbstractConfig {
public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC =
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+ public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG =
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
+ public static final String METADATA_CLUSTER_CHECK_ENABLE_DOC =
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;
+
/**
* <code>security.providers</code>
*/
@@ -285,6 +288,11 @@ public class AdminClientConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+ .define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index d8608e3d79d..4dd6002ece5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -689,6 +689,11 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
ShareAcknowledgementMode.IMPLICIT.name(),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9a99c5ef08..3ecc9e1e2cb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -558,6 +558,11 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+
.define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json
b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 56170c96673..2f13edd531b 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -23,12 +23,19 @@
// Version 3 is the first flexible version and adds ClientSoftwareName and
ClientSoftwareVersion.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion
in the response from being 0.
- "validVersions": "0-4",
+ //
+ // Version 5 introduces ClusterId and NodeId checking and
REBOOTSTRAP_REQUIRED error (KIP-1242).
+ "validVersions": "0-5",
"flexibleVersions": "3+",
+ "latestVersionUnstable": true,
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
"ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
- "ignorable": true, "about": "The version of the client." }
+ "ignorable": true, "about": "The version of the client." },
+ { "name": "ClusterId", "type": "string", "versions": "5+",
"nullableVersions": "5+", "default": "null",
+ "ignorable": true, "about": "The cluster ID the client intends to
connect to, if known." },
+ { "name": "NodeId", "type": "int32", "versions": "5+", "default": -1,
+ "ignorable": true, "about": "The node ID the client intends to connect
to, if known." }
]
}
diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json
b/clients/src/main/resources/common/message/ApiVersionsResponse.json
index 1017f244360..722bfa743a6 100644
--- a/clients/src/main/resources/common/message/ApiVersionsResponse.json
+++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json
@@ -29,7 +29,9 @@
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is
returned.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion
from being 0.
- "validVersions": "0-4",
+ //
+ // Version 5 introduces ClusterId and NodeId checking and
REBOOTSTRAP_REQUIRED error (KIP-1242).
+ "validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e8dcf5843dc..d3f52a0f575 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -463,7 +463,7 @@ public class NetworkClientTest {
ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
RequestHeader header = parseHeader(buffer);
assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
- assertEquals(4, header.apiVersion());
+ assertEquals(5, header.apiVersion());
// prepare response
ApiVersionCollection apiKeys = new ApiVersionCollection();
@@ -535,7 +535,7 @@ public class NetworkClientTest {
ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer();
RequestHeader header = parseHeader(buffer);
assertEquals(ApiKeys.API_VERSIONS, header.apiKey());
- assertEquals(4, header.apiVersion());
+ assertEquals(5, header.apiVersion());
// prepare response
delayedApiVersionsResponse(0, (short) 0,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index ccf33926bf9..9ff90e7a516 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -106,6 +106,9 @@ public final class DistributedConfig extends WorkerConfig {
private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC =
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+ public static final String METADATA_CLUSTER_CHECK_ENABLE_CONFIG =
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG;
+ private static final String METADATA_CLUSTER_CHECK_ENABLE_DOC =
CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC;
+
/**
* <code>worker.sync.timeout.ms</code>
*/
@@ -535,7 +538,12 @@ public final class DistributedConfig extends WorkerConfig {
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0),
ConfigDef.Importance.LOW,
- METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+ METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+ .define(METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ true,
+ ConfigDef.Importance.LOW,
+ METADATA_CLUSTER_CHECK_ENABLE_DOC);
}
diff --git
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index 6e5c5bd8d1e..e26f035a2d2 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -82,7 +82,15 @@ class DefaultApiVersionManagerTest {
)
ApiKeys.apisForListener(apiScope).forEach { apiKey =>
- if (apiKey.messageType.latestVersionUnstable()) {
+ if (apiKey.id == ApiKeys.API_VERSIONS.id) {
+ // ApiVersions API is a particular case. The client always send the
highest version
+ // that it supports and the server fails back to version 0 if it does
not know it.
+ // See ApiKeys.isVersionEnabled for more information (KIP-511).
+ // Because API_VERSIONS has an unstable version while KIP-1242 is
under development,
+ // we need a special case in this test. This assertion will start
failing when the
+ // API is no longer unstable and the special case can be removed.
+ assertTrue(apiKey.messageType.latestVersionUnstable());
+ } else if (apiKey.messageType.latestVersionUnstable()) {
assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
s"$apiKey version ${apiKey.latestVersion} should be disabled.")
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2d860cd7905..083ca82e549 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1256,6 +1256,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+ .define(CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CommonClientConfigs.METADATA_CLUSTER_CHECK_ENABLE_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
null,