This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new eb51b968f49 KAFKA-19851; Delete dynamic configs that were removed by
Kafka (#21053)
eb51b968f49 is described below
commit eb51b968f497edb778a118c3f22e7f153bfc1ef2
Author: zhiyan-tang <[email protected]>
AuthorDate: Thu May 7 13:21:38 2026 -0500
KAFKA-19851; Delete dynamic configs that were removed by Kafka (#21053)
When upgrading from Kafka 3.x to 4.0, the metadata log may contain
dynamic configurations that were removed in 4.0 (e.g.,
message.format.version per KIP-724). These removed configs cause
InvalidConfigurationException when users attempt to modify any
configuration, because validation checks all existing configs including
the removed ones.
Adds filtering to prevent unsupported or invalid configurations from
being applied during metadata replay. The filtering is implemented using
a SupportedConfigChecker interface that is injected via dependency
injection through Builder patterns. When a ConfigRecord is replayed, the
checker validates whether the configuration name is supported for the
given resource type. Unsupported configurations are silently ignored
during replay, ensuring that only valid configurations enter the
in-memory state.
The SupportedConfigChecker interface provides a default TRUE
implementation that accepts all configurations. The actual filtering
logic is implemented by DefaultSupportedConfigChecker, which maintains a
whitelist of valid configuration names per resource type (TOPIC,
CLIENT_METRICS, GROUP) based on the actual config definitions. The
filtering occurs in both ConfigurationDelta#replay and
ConfigurationControlManager#replay methods.
Added unit tests to ensure:
- Removed configurations are filtered during the replay operations
- Only supported configurations appear in the resulting metadata images
- The filtering works correctly for all resource types (TOPIC, BROKER,
CLIENT_METRICS, GROUP)
- DefaultSupportedConfigChecker correctly identifies supported vs
unsupported configurations for each resource type
Reviewers: José Armando García Sancio <[email protected]>, Jun Rao
<[email protected]>, Alyssa Huang <[email protected]>, Kevin Wu
<[email protected]>, Andrew Grant <[email protected]>
(cherry picked from commit a35d649)
(cherry picked from commit 3be19e4)
(cherry picked from commit 90ad891)
---
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../common/runtime/CoordinatorRuntimeTest.java | 6 +-
.../main/scala/kafka/server/ControllerServer.scala | 1 +
.../src/main/scala/kafka/server/SharedServer.scala | 8 ++-
.../kafka/server/LocalLeaderEndPointTest.scala | 8 ++-
.../unit/kafka/server/ApiVersionManagerTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 8 ++-
.../metadata/BrokerMetadataPublisherTest.scala | 8 ++-
.../coordinator/group/GroupCoordinatorShard.java | 4 +-
.../group/GroupMetadataManagerTest.java | 16 +++--
.../coordinator/group/MetadataImageBuilder.java | 4 +-
.../kafka/jmh/assignor/AssignorBenchmarkUtils.java | 4 +-
.../jmh/coordinator/RegexResolutionBenchmark.java | 4 +-
.../TransactionalOffsetFetchBenchmark.java | 4 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 4 +-
.../controller/ConfigurationControlManager.java | 22 ++++++-
.../apache/kafka/controller/QuorumController.java | 10 +++
.../org/apache/kafka/image/ConfigurationDelta.java | 13 +++-
.../apache/kafka/image/ConfigurationsDelta.java | 11 ++--
.../java/org/apache/kafka/image/MetadataDelta.java | 16 ++++-
.../kafka/image/loader/MetadataBatchLoader.java | 16 ++++-
.../apache/kafka/image/loader/MetadataLoader.java | 24 ++++++-
.../kafka/metadata/SupportedConfigChecker.java | 40 +++++++++++
.../ConfigurationControlManagerTest.java | 67 +++++++++++++++++++
.../kafka/controller/QuorumControllerTest.java | 3 +-
.../ControllerMetadataMetricsPublisherTest.java | 4 +-
.../kafka/image/ConfigurationsImageTest.java | 45 ++++++++++++-
.../org/apache/kafka/image/MetadataImageTest.java | 2 +-
.../image/loader/MetadataBatchLoaderTest.java | 22 +++++--
.../kafka/image/loader/MetadataLoaderTest.java | 75 +++++++++++++++++++++
.../config/DefaultSupportedConfigChecker.java | 77 ++++++++++++++++++++++
.../config/DefaultSupportedConfigCheckerTest.java | 65 ++++++++++++++++++
.../java/org/apache/kafka/shell/MetadataShell.java | 5 ++
34 files changed, 552 insertions(+), 50 deletions(-)
diff --git a/build.gradle b/build.gradle
index 813544ca12f..7ef2126273f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2591,6 +2591,7 @@ project(':shell') {
implementation project(':core')
implementation project(':metadata')
implementation project(':raft')
+ implementation project(':server')
implementation libs.jose4j // for SASL/OAUTHBEARER JWT
validation
implementation libs.jacksonJakartarsJsonProvider
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ab6ebff7433..dc1aea05ffc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -277,6 +277,7 @@
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.shell"/>
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 6230e382f12..c85955ff397 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -107,7 +107,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
+@SuppressWarnings({"checkstyle:ClassDataAbstractionCoupling",
"checkstyle:ClassFanOutComplexity"})
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new
TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@@ -2565,7 +2565,9 @@ public class CoordinatorRuntimeTest {
verify(coordinator0).onLoaded(MetadataImage.EMPTY);
// Publish a new image.
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
runtime.onNewMetadataImage(newImage, delta);
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 1933a55dfeb..1fb7daf20d8 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -234,6 +234,7 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setConfigurationValidator(new
ControllerConfigurationValidator(sharedServer.brokerConfig)).
+ setSupportedConfigChecker(sharedServer.supportedConfigChecker).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 69d2353fb83..53ab8114104 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
-import org.apache.kafka.metadata.ListenerInfo
-import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo,
MetadataRecordSerde}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler,
ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics,
KafkaYammerMetrics}
@@ -112,6 +112,7 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+ val supportedConfigChecker: SupportedConfigChecker = new
DefaultSupportedConfigChecker()
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
@@ -315,7 +316,8 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
- setMetrics(metadataLoaderMetrics)
+ setMetrics(metadataLoaderMetrics).
+ setSupportedConfigChecker(supportedConfigChecker)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(nodeId).
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 4b9e7569b6e..62ef2a0b0f8 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -74,7 +74,9 @@ class LocalLeaderEndPointTest extends Logging {
alterPartitionManager = alterPartitionManager
)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -250,7 +252,9 @@ class LocalLeaderEndPointTest extends Logging {
}
private def bumpLeaderEpoch(): Unit = {
- val delta = new MetadataDelta(image)
+ val delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
delta.replay(new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partition)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index 19a95ca9450..c4293249224 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -35,7 +35,9 @@ class ApiVersionManagerTest {
private val brokerFeatures = BrokerFeatures.createDefault(true)
private val metadataCache = {
val cache = MetadataCache.kRaftMetadataCache(1, () =>
KRaftVersion.LATEST_PRODUCTION)
- val delta = new MetadataDelta(MetadataImage.EMPTY);
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bb1f4105ba8..6cc31810882 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9767,7 +9767,9 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
metadataCache = {
val cache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY);
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -9997,7 +9999,9 @@ class KafkaApisTest extends Logging {
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY);
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index c5f4d3187e3..844238e12a9 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -210,7 +210,9 @@ class BrokerMetadataPublisherTest {
)
val topicId = Uuid.randomUuid()
- var delta = new MetadataDelta(MetadataImage.EMPTY)
+ var delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new TopicRecord()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setTopicId(topicId)
@@ -227,7 +229,9 @@ class BrokerMetadataPublisherTest {
)
val image = delta.apply(MetadataProvenance.EMPTY)
- delta = new MetadataDelta(image)
+ delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
delta.replay(new RemoveTopicRecord()
.setTopicId(topicId)
)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 74ccacbf5fa..23e7efa7402 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -741,7 +741,9 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
*/
@Override
public void onLoaded(MetadataImage newImage) {
- MetadataDelta emptyDelta = new MetadataDelta(newImage);
+ MetadataDelta emptyDelta = new MetadataDelta.Builder()
+ .setImage(newImage)
+ .build();
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
coordinatorMetrics.activateMetricsShard(metricsShard);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ea73b44590c..3445d6d8282 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -2950,7 +2950,9 @@ public class GroupMetadataManagerTest {
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
MockPartitionAssignor("range")))
.build();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
context.groupMetadataManager.onNewMetadataImage(image, delta);
@@ -3007,7 +3009,9 @@ public class GroupMetadataManagerTest {
Uuid topicE = Uuid.randomUuid();
// Create a first base image with topic a, b, c and d.
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
delta.replay(new
PartitionRecord().setTopicId(topicA).setPartitionId(0));
delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
@@ -3019,7 +3023,9 @@ public class GroupMetadataManagerTest {
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
// Create a delta which updates topic B, deletes topic D and creates
topic E.
- delta = new MetadataDelta(image);
+ delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
delta.replay(new
PartitionRecord().setTopicId(topicB).setPartitionId(2));
delta.replay(new RemoveTopicRecord().setTopicId(topicD));
delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
@@ -15995,7 +16001,9 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onNewMetadataImage(
newImage,
- new MetadataDelta(newImage)
+ new MetadataDelta.Builder()
+ .setImage(newImage)
+ .build()
);
// A member heartbeats.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
index 23a01a60241..0d5a26a8dc0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
@@ -34,7 +34,9 @@ public class MetadataImageBuilder {
}
public MetadataImageBuilder(MetadataImage image) {
- this.delta = new MetadataDelta(image);
+ this.delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
}
public MetadataImageBuilder addTopic(
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index 9402728e1eb..325aa8bcbfd 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -139,7 +139,9 @@ public class AssignorBenchmarkUtils {
* subscription metadata.
*/
public static TopicsImage createTopicsImage(Map<String, TopicMetadata>
subscriptionMetadata) {
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
for (Map.Entry<String, TopicMetadata> entry :
subscriptionMetadata.entrySet()) {
TopicMetadata topicMetadata = entry.getValue();
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index 35ee42836c8..79d16ab2c55 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -95,7 +95,9 @@ public class RegexResolutionBenchmark {
public void setup() {
Random random = new Random();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
for (int i = 0; i < topicCount; i++) {
String topicName =
WORDS.get(random.nextInt(WORDS.size())) + "_" +
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index b1986f639a7..fed08a4ebe3 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -82,7 +82,9 @@ public class TransactionalOffsetFetchBenchmark {
@Setup(Level.Trial)
public void setup() {
LogContext logContext = new LogContext();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
delta.replay(new TopicRecord()
.setTopicId(Uuid.randomUuid())
.setName(TOPIC_NAME));
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index d58b35942fe..6b2db784de8 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -137,7 +137,9 @@ public class KRaftMetadataRequestBenchmark {
}
private void initializeMetadataCache() {
- MetadataDelta buildupMetadataDelta = new
MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta buildupMetadataDelta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
IntStream.range(0, 5).forEach(brokerId -> {
RegisterBrokerRecord.BrokerEndpointCollection endpoints = new
RegisterBrokerRecord.BrokerEndpointCollection();
endpoints(brokerId).forEach(endpoint -> endpoints.add(endpoint));
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 1ee2dcbd2a3..7cfcaadcf18 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.mutable.BoundedList;
@@ -77,6 +78,7 @@ public class ConfigurationControlManager {
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
private final FeatureControlManager featureControl;
+ private final SupportedConfigChecker supportedConfigChecker;
static class Builder {
private LogContext logContext = null;
@@ -88,6 +90,7 @@ public class ConfigurationControlManager {
private Map<String, Object> staticConfig = Collections.emptyMap();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
+ private SupportedConfigChecker supportedConfigChecker =
SupportedConfigChecker.TRUE;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -134,6 +137,11 @@ public class ConfigurationControlManager {
return this;
}
+ Builder setSupportedConfigChecker(SupportedConfigChecker
supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
@@ -152,7 +160,8 @@ public class ConfigurationControlManager {
validator,
staticConfig,
nodeId,
- featureControl);
+ featureControl,
+ supportedConfigChecker);
}
}
@@ -164,7 +173,8 @@ public class ConfigurationControlManager {
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId,
- FeatureControlManager featureControl
+ FeatureControlManager featureControl,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
@@ -177,6 +187,7 @@ public class ConfigurationControlManager {
this.staticConfig = Collections.unmodifiableMap(new
HashMap<>(staticConfig));
this.currentController = new ConfigResource(Type.BROKER,
Integer.toString(nodeId));
this.featureControl = featureControl;
+ this.supportedConfigChecker = supportedConfigChecker;
}
SnapshotRegistry snapshotRegistry() {
@@ -504,6 +515,13 @@ public class ConfigurationControlManager {
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type,
record.resourceName());
+
+ if (!supportedConfigChecker.isSupported(configResource.type(),
record.name())) {
+ // We skip unsupported configs during replay. This can happen when
the config was
+ // deprecated and removed, but old records still exist in the log.
+ log.info("Skipping unsupported config {} for resource {} during
replay", record.name(), configResource);
+ return;
+ }
TimelineHashMap<String, String> configs =
configData.get(configResource);
if (configs == null) {
configs = new TimelineHashMap<>(snapshotRegistry, 0);
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index fc5f99358f2..95ce74d7ccb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -100,6 +100,7 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
@@ -215,6 +216,7 @@ public final class QuorumController implements Controller {
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy =
Optional.empty();
private ConfigurationValidator configurationValidator =
ConfigurationValidator.NO_OP;
+ private SupportedConfigChecker supportedConfigChecker =
SupportedConfigChecker.TRUE;
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
@@ -352,6 +354,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker
supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public Builder setStaticConfig(Map<String, Object> staticConfig) {
this.staticConfig = staticConfig;
return this;
@@ -440,6 +447,7 @@ public final class QuorumController implements Controller {
createTopicPolicy,
alterConfigPolicy,
configurationValidator,
+ supportedConfigChecker,
staticConfig,
bootstrapMetadata,
maxRecordsPerBatch,
@@ -1498,6 +1506,7 @@ public final class QuorumController implements Controller
{
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
+ SupportedConfigChecker supportedConfigChecker,
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata,
int maxRecordsPerBatch,
@@ -1557,6 +1566,7 @@ public final class QuorumController implements Controller
{
setExistenceChecker(resourceExists).
setAlterConfigPolicy(alterConfigPolicy).
setValidator(configurationValidator).
+ setSupportedConfigChecker(supportedConfigChecker).
setStaticConfig(staticConfig).
setNodeId(nodeId).
setFeatureControl(featureControl).
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index dc550d8c72a..63e6c2b270c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -17,7 +17,9 @@
package org.apache.kafka.image;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import java.util.HashMap;
import java.util.Map;
@@ -29,11 +31,14 @@ import java.util.Optional;
* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationDelta {
+
private final ConfigurationImage image;
private final Map<String, Optional<String>> changes = new HashMap<>();
+ private final SupportedConfigChecker supportedConfigChecker;
- public ConfigurationDelta(ConfigurationImage image) {
+ public ConfigurationDelta(ConfigurationImage image, SupportedConfigChecker
supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public void finishSnapshot() {
@@ -45,6 +50,12 @@ public final class ConfigurationDelta {
}
public void replay(ConfigRecord record) {
+ ConfigResource.Type type =
ConfigResource.Type.forId(record.resourceType());
+ if (!supportedConfigChecker.isSupported(type, record.name())) {
+ // We skip unsupported configs during replay. This can happen when
the config was
+ // deprecated and removed, but old records still exist in the log.
+ return;
+ }
changes.put(record.name(), Optional.ofNullable(record.value()));
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index 9d794bfddc2..6587baad9af 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.Map.Entry;
public final class ConfigurationsDelta {
private final ConfigurationsImage image;
private final Map<ConfigResource, ConfigurationDelta> changes = new
HashMap<>();
+ private final SupportedConfigChecker supportedConfigChecker;
- public ConfigurationsDelta(ConfigurationsImage image) {
+ public ConfigurationsDelta(ConfigurationsImage image,
SupportedConfigChecker supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public Map<ConfigResource, ConfigurationDelta> changes() {
@@ -49,7 +52,7 @@ public final class ConfigurationsDelta {
ConfigResource resource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
ConfigurationDelta configDelta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage,
supportedConfigChecker));
configDelta.finishSnapshot();
}
}
@@ -64,7 +67,7 @@ public final class ConfigurationsDelta {
ConfigurationImage configImage =
image.resourceData().getOrDefault(resource,
new ConfigurationImage(resource, Collections.emptyMap()));
ConfigurationDelta delta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage, supportedConfigChecker));
delta.replay(record);
}
@@ -74,7 +77,7 @@ public final class ConfigurationsDelta {
if (image.resourceData().containsKey(resource)) {
ConfigurationImage configImage =
image.resourceData().get(resource);
ConfigurationDelta delta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage,
supportedConfigChecker));
delta.deleteAll();
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index b934d10f6d1..36d68dff5a2 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Optional;
@@ -51,19 +52,27 @@ import java.util.Optional;
public final class MetadataDelta {
public static class Builder {
private MetadataImage image = MetadataImage.EMPTY;
+ private SupportedConfigChecker supportedConfigChecker =
SupportedConfigChecker.TRUE;
public Builder setImage(MetadataImage image) {
this.image = image;
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker
supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public MetadataDelta build() {
- return new MetadataDelta(image);
+ return new MetadataDelta(image, supportedConfigChecker);
}
}
private final MetadataImage image;
+ private final SupportedConfigChecker supportedConfigChecker;
+
private FeaturesDelta featuresDelta = null;
private ClusterDelta clusterDelta = null;
@@ -82,8 +91,9 @@ public final class MetadataDelta {
private DelegationTokenDelta delegationTokenDelta = null;
- public MetadataDelta(MetadataImage image) {
+ private MetadataDelta(MetadataImage image, SupportedConfigChecker
supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public MetadataImage image() {
@@ -122,7 +132,7 @@ public final class MetadataDelta {
}
public ConfigurationsDelta getOrCreateConfigsDelta() {
- if (configsDelta == null) configsDelta = new
ConfigurationsDelta(image.configs());
+ if (configsDelta == null) configsDelta = new
ConfigurationsDelta(image.configs(), supportedConfigChecker);
return configsDelta;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index c4b6286f2a9..f8cfa2005ee 100644
---
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -58,6 +59,7 @@ public class MetadataBatchLoader {
private final Time time;
private final FaultHandler faultHandler;
private final MetadataUpdater callback;
+ private final SupportedConfigChecker supportedConfigChecker;
private MetadataImage image;
private MetadataDelta delta;
@@ -74,12 +76,14 @@ public class MetadataBatchLoader {
LogContext logContext,
Time time,
FaultHandler faultHandler,
- MetadataUpdater callback
+ MetadataUpdater callback,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(MetadataBatchLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.callback = callback;
+ this.supportedConfigChecker = supportedConfigChecker;
this.resetToImage(MetadataImage.EMPTY);
this.hasSeenRecord = false;
}
@@ -101,7 +105,10 @@ public class MetadataBatchLoader {
public final void resetToImage(MetadataImage image) {
this.image = image;
this.hasSeenRecord = !image.isEmpty();
- this.delta = new MetadataDelta.Builder().setImage(image).build();
+ this.delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .setSupportedConfigChecker(supportedConfigChecker)
+ .build();
this.transactionState = TransactionState.NO_TRANSACTION;
this.lastOffset = image.provenance().lastContainedOffset();
this.lastEpoch = image.provenance().lastContainedEpoch();
@@ -199,7 +206,10 @@ public class MetadataBatchLoader {
log.debug("handleCommit: publishing empty delta between {} and
{} from {} batch(es) " +
"since a transaction was aborted", image.offset(),
manifest.provenance().lastContainedOffset(),
manifest.numBatches());
- applyDeltaAndUpdate(new
MetadataDelta.Builder().setImage(image).build(), manifest);
+ applyDeltaAndUpdate(new MetadataDelta.Builder()
+ .setImage(image)
+ .setSupportedConfigChecker(supportedConfigChecker)
+ .build(), manifest);
break;
case ENDED_TRANSACTION:
case NO_TRANSACTION:
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 4fabd1863b6..4562238916e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
@@ -76,6 +77,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
private FaultHandler faultHandler = FaultHandlerException::new;
private MetadataLoaderMetrics metrics = null;
private Supplier<OptionalLong> highWaterMarkAccessor = null;
+ private SupportedConfigChecker supportedConfigChecker =
SupportedConfigChecker.TRUE;
public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
@@ -107,6 +109,11 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker
supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public MetadataLoader build() {
if (logContext == null) {
logContext = new LogContext("[MetadataLoader id=" + nodeId +
"] ");
@@ -126,7 +133,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
threadNamePrefix,
faultHandler,
metrics,
- highWaterMarkAccessor);
+ highWaterMarkAccessor,
+ supportedConfigChecker);
}
}
@@ -190,19 +198,26 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
*/
private final KafkaEventQueue eventQueue;
+ /**
+ * Config checker for filtering unsupported configurations.
+ */
+ private final SupportedConfigChecker supportedConfigChecker;
+
private MetadataLoader(
Time time,
LogContext logContext,
String threadNamePrefix,
FaultHandler faultHandler,
MetadataLoaderMetrics metrics,
- Supplier<OptionalLong> highWaterMarkAccessor
+ Supplier<OptionalLong> highWaterMarkAccessor,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(MetadataLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.metrics = metrics;
this.highWaterMarkAccessor = highWaterMarkAccessor;
+ this.supportedConfigChecker = supportedConfigChecker;
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
@@ -210,7 +225,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
logContext,
time,
faultHandler,
- this::maybePublishMetadata);
+ this::maybePublishMetadata,
+ supportedConfigChecker);
this.eventQueue = new KafkaEventQueue(
Time.SYSTEM,
logContext,
@@ -289,6 +305,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
// haven't seen anything previously.
MetadataDelta delta = new MetadataDelta.Builder().
setImage(MetadataImage.EMPTY).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new
ImageWriterOptions.Builder(image.features().metadataVersionOrThrow()).
@@ -381,6 +398,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
snapshotName, numLoaded);
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
log.info("handleLoadSnapshot({}): generated a metadata delta
between offset {} " +
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
new file mode 100644
index 00000000000..df9bf984d7d
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+/**
+ * Interface for checking if a configuration name is supported for a given
resource type.
+ */
+@FunctionalInterface
+public interface SupportedConfigChecker {
+ /**
+ * Check if a configuration name is supported for the given resource type.
+ *
+ * @param resourceType the type of resource (broker, topic, user, etc.)
+ * @param configName the name of the configuration
+ * @return true if the configuration is supported for the resource type,
false otherwise
+ */
+ boolean isSupported(ConfigResource.Type resourceType, String configName);
+
+ /**
+ * A SupportedConfigChecker that always returns true, accepting all
configurations.
+ */
+ SupportedConfigChecker TRUE = (resourceType, configName) -> true;
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 4c1c73f64ac..adf7a1d2c49 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -25,10 +25,12 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -534,4 +536,69 @@ public class ConfigurationControlManagerTest {
assertEquals(Errors.INVALID_UPDATE_VERSION,
result.response().error());
}
}
+
+ private FeatureControlManager createFeatureControlManager() {
+ FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
+ featureControlManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+ return featureControlManager;
+ }
+
+ @Test
+ public void testValidateAlterConfigWithInvalidExistingConfigs() {
+ Set<String> validConfigs = Set.of("abc", "def");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
+ setKafkaConfigSchema(SCHEMA).
+ setSupportedConfigChecker(supportedConfigChecker).
+ build();
+
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("value1"));
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("invalid.config").setValue("should-be-filtered"));
+
+ Map<String, String> configs = manager.getConfigs(MYTOPIC);
+ assertTrue(configs.containsKey("abc"));
+ assertFalse(configs.containsKey("invalid.config"));
+
+ ControllerResult<ApiError> result = manager.incrementalAlterConfig(
+ MYTOPIC,
+ toMap(entry("def", entry(SET, "newValue"))),
+ false);
+
+ assertEquals(ApiError.NONE, result.response());
+ }
+
+ @Test
+ public void testReplayFiltersInvalidConfigs() {
+ Set<String> validConfigs = Set.of("abc", "def", "ghi");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA).
+ setSupportedConfigChecker(supportedConfigChecker).
+ build();
+
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("value1"));
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("def").setValue("value2"));
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("invalid.config").setValue("should-be-filtered"));
+
+ Map<String, String> configs = manager.getConfigs(MYTOPIC);
+ assertEquals(2, configs.size());
+ assertTrue(configs.containsKey("abc"));
+ assertTrue(configs.containsKey("def"));
+ assertFalse(configs.containsKey("invalid.config"));
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index f50ba5a9110..72f059e7d1b 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -96,6 +96,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
@@ -1656,7 +1657,7 @@ public class QuorumControllerTest {
new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new),
new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY,
ClientQuotasDelta::new),
new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new),
- new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY,
ConfigurationsDelta::new),
+ new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, image -> new
ConfigurationsDelta(image, SupportedConfigChecker.TRUE)),
new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY,
DelegationTokenDelta::new),
new ImageDeltaPair<>(() -> FeaturesImage.EMPTY,
FeaturesDelta::new),
new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY,
ProducerIdsDelta::new),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
index ffde7b86be9..6a743b026eb 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
@@ -144,7 +144,9 @@ public class ControllerMetadataMetricsPublisherTest {
@Test
public void testLoadSnapshot() {
try (TestEnv env = new TestEnv()) {
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
ImageReWriter writer = new ImageReWriter(delta);
IMAGE1.write(writer, new
ImageWriterOptions.Builder(MetadataVersion.MINIMUM_VERSION).build());
env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
diff --git
a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 2135a498bf4..d9a6bb35197 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -34,10 +35,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -83,7 +87,7 @@ public class ConfigurationsImageTest {
setResourceName("2").setName("foo").setValue("bar"),
CONFIG_RECORD.highestSupportedVersion()));
- DELTA1 = new ConfigurationsDelta(IMAGE1);
+ DELTA1 = new ConfigurationsDelta(IMAGE1, SupportedConfigChecker.TRUE);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
Map<ConfigResource, ConfigurationImage> map2 = new HashMap<>();
@@ -119,6 +123,43 @@ public class ConfigurationsImageTest {
testToImage(IMAGE2);
}
+ @Test
+ public void testConfigurationDeltaFiltering() {
+ Set<String> validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ Map<String, String> initialConfigs = Map.of("foo", "value1"); // valid
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
supportedConfigChecker);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("bar").setValue("value2"));
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("qux").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertFalse(result.data().containsKey("qux"));
+ }
+
+ @Test
+ public void testConfigurationDeltaWithoutFiltering() {
+ Map<String, String> initialConfigs = Map.of("foo", "value1", "bar",
"value2");
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
SupportedConfigChecker.TRUE);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("baz").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertTrue(result.data().containsKey("baz"));
+ }
+
private static void testToImage(ConfigurationsImage image) {
testToImage(image, Optional.empty());
}
@@ -131,7 +172,7 @@ public class ConfigurationsImageTest {
// test from empty image stopping each of the various intermediate
images along the way
new
RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ConfigurationsImage.EMPTY,
- ConfigurationsDelta::new
+ img -> new ConfigurationsDelta(img, SupportedConfigChecker.TRUE)
).test(image, fromRecords);
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 715ac0beed4..e79408c66e0 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -140,7 +140,7 @@ public class MetadataImageTest {
// test from empty image stopping each of the various intermediate
images along the way
new
RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> MetadataImage.EMPTY,
- MetadataDelta::new
+ img -> new MetadataDelta.Builder().setImage(img).build()
) {
@Override
public MetadataImage createImageByApplyingDelta(MetadataDelta
delta) {
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
index e0656c8e267..fafba8bb237 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -151,7 +152,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
new MockFaultHandler("testAlignedTransactionBatches"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -187,7 +189,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
new MockFaultHandler("testSingletonBeginAndEnd"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// All in one commit
@@ -236,7 +239,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
Batch<ApiMessageAndVersion> batch1 = Batch.data(
@@ -266,7 +270,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// First batch gets loaded fine
@@ -299,7 +304,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// First batch gets loaded fine
@@ -336,7 +342,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -420,7 +427,8 @@ public class MetadataBatchLoaderTest {
new LogContext(),
new MockTime(),
new MockFaultHandler("testOneTransactionInMultipleBatches"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 7b2a6e82bc8..ff669f0db2c 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,9 +18,11 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -32,6 +34,7 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
@@ -858,4 +861,76 @@ public class MetadataLoaderTest {
}
faultHandler.maybeRethrowFirstException();
}
+
+ @Test
+ public void testUnsupportedConfigFilteredInCommit() throws Exception {
+ // Create a checker that rejects "message.format.version"
+ SupportedConfigChecker checker = (type, name) ->
+ !name.equals("message.format.version");
+
+ MockFaultHandler faultHandler = new
MockFaultHandler("testUnsupportedConfigFilteredInCommit");
+ MockPublisher publisher = new
MockPublisher("testUnsupportedConfigFilteredInCommit");
+
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
+ setSupportedConfigChecker(checker).
+ build()) {
+ loader.installPublishers(List.of(publisher)).get();
+ loadTestSnapshot(loader, 100);
+ publisher.firstPublish.get(10, TimeUnit.SECONDS);
+
+ // Commit a batch containing ConfigRecord with
message.format.version
+ loader.handleCommit(MockBatchReader.newSingleBatchReader(200, 100,
List.of(
+ new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(ConfigResource.Type.TOPIC.id()).
+ setResourceName("test-topic").
+ setName("message.format.version").
+ setValue("2.8"), (short) 0)
+ )));
+ loader.waitForAllEventsToBeHandled();
+
+ // Verify config was filtered out
+ assertTrue(publisher.latestImage.configs().configMapForResource(
+ new ConfigResource(ConfigResource.Type.TOPIC,
"test-topic")).isEmpty());
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
+
+ @Test
+ public void testUnsupportedConfigFilteredInSnapshot() throws Exception {
+ // Create a checker that rejects "message.format.version"
+ SupportedConfigChecker checker = (type, name) ->
+ !name.equals("message.format.version");
+
+ MockFaultHandler faultHandler = new
MockFaultHandler("testUnsupportedConfigFilteredInSnapshot");
+ MockPublisher publisher = new
MockPublisher("testUnsupportedConfigFilteredInSnapshot");
+
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+ setSupportedConfigChecker(checker).
+ build()) {
+ loader.installPublishers(List.of(publisher)).get();
+
+ // Load a snapshot containing ConfigRecord with
message.format.version
+ loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
+ new MetadataProvenance(200, 100, 4000, true), List.of(
+ List.of(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MINIMUM_VERSION.featureLevel()),
(short) 0)),
+ List.of(new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(ConfigResource.Type.TOPIC.id()).
+ setResourceName("test-topic").
+ setName("message.format.version").
+ setValue("2.8"), (short) 0))
+ )));
+ loader.waitForAllEventsToBeHandled();
+
+ // Verify config was filtered out
+ assertTrue(publisher.latestImage.configs().configMapForResource(
+ new ConfigResource(ConfigResource.Type.TOPIC,
"test-topic")).isEmpty());
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
new file mode 100644
index 00000000000..301d9e1d6bf
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.metadata.SupportedConfigChecker;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Default implementation of SupportedConfigChecker that checks if a
configuration name
+ * is supported for a given resource type based on the actual config
definitions.
+ *
+ * This class maintains a predicate per resource type:
+ * - TOPIC: Configurations defined in LogConfig
+ * - BROKER: All config names are accepted. Broker configs include
listener-specific
+ * prefixed configs (e.g., listener.name.<name>.ssl.keystore.location)
whose names
+ * are user-defined at runtime and cannot be pre-enumerated. They also
include
+ * plugin-defined configs (e.g., custom authorizer or quota callback
configs) with
+ * arbitrary names. For these reasons BROKER configs are not filtered by
name.
+ * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs
+ * - GROUP: Configurations defined in GroupConfig
+ *
+ * Config names for resource types not in this map are considered unsupported.
+ */
+public final class DefaultSupportedConfigChecker implements
SupportedConfigChecker {
+ static final class SetContainsPredicate implements Predicate<String> {
+ private final Set<String> keys;
+
+ SetContainsPredicate(Set<String> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public boolean test(String key) {
+ return keys.contains(key);
+ }
+ }
+
+ private final Map<ConfigResource.Type, Predicate<String>>
validConfigsByType;
+
+ public DefaultSupportedConfigChecker() {
+ this.validConfigsByType = Map.of(
+ ConfigResource.Type.TOPIC, new SetContainsPredicate(new
HashSet<>(LogConfig.configNames())),
+ ConfigResource.Type.BROKER, ignore -> true,
+ ConfigResource.Type.CLIENT_METRICS, new
SetContainsPredicate(ClientMetricsConfigs.configDef().names()),
+ ConfigResource.Type.GROUP, new
SetContainsPredicate(GroupConfig.configDef().names())
+ );
+ }
+
+ @Override
+ public boolean isSupported(ConfigResource.Type resourceType, String
configName) {
+ Predicate<String> predicate = validConfigsByType.get(resourceType);
+ return predicate != null && predicate.test(configName);
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
new file mode 100644
index 00000000000..5ada0c424bf
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static
org.apache.kafka.common.config.ConfigResource.Type.CLIENT_METRICS;
+import static org.apache.kafka.common.config.ConfigResource.Type.GROUP;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultSupportedConfigCheckerTest {
+ private final DefaultSupportedConfigChecker checker = new
DefaultSupportedConfigChecker();
+
+ @Test
+ void testIsSupported() {
+ assertTrue(checker.isSupported(TOPIC,
TopicConfig.SEGMENT_BYTES_CONFIG));
+ assertTrue(checker.isSupported(TOPIC, TopicConfig.SEGMENT_MS_CONFIG));
+ assertTrue(checker.isSupported(TOPIC,
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG));
+ assertFalse(checker.isSupported(TOPIC, "invalid.topic.config"));
+
+ assertTrue(checker.isSupported(BROKER, "log.cleaner.threads"));
+ assertTrue(checker.isSupported(BROKER, "num.network.threads"));
+ assertTrue(checker.isSupported(BROKER, "log.segment.bytes"));
+ assertTrue(checker.isSupported(BROKER,
"listener.name.EXTERNAL.ssl.keystore.location"));
+ assertTrue(checker.isSupported(BROKER,
"fake.configurable.authorizer.foobar.config"));
+
+ assertTrue(checker.isSupported(CLIENT_METRICS,
ClientMetricsConfigs.PUSH_INTERVAL_MS));
+ assertTrue(checker.isSupported(CLIENT_METRICS,
ClientMetricsConfigs.SUBSCRIPTION_METRICS));
+ assertTrue(checker.isSupported(CLIENT_METRICS,
ClientMetricsConfigs.CLIENT_MATCH_PATTERN));
+ assertFalse(checker.isSupported(CLIENT_METRICS,
"invalid.client.metrics.config"));
+
+ assertTrue(checker.isSupported(GROUP,
GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+ assertTrue(checker.isSupported(GROUP,
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
+ assertFalse(checker.isSupported(GROUP, "invalid.group.config"));
+
+ assertTrue(checker.isSupported(TOPIC,
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+ assertTrue(checker.isSupported(TOPIC,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+
+ assertTrue(checker.isSupported(BROKER,
QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG));
+ assertTrue(checker.isSupported(BROKER,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG));
+ assertTrue(checker.isSupported(BROKER,
QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG));
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index 5600aa5e5ef..7a788ae52e4 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -23,8 +23,10 @@ import kafka.tools.TerseFailure;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.loader.MetadataLoader;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.util.SnapshotFileReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.LoggingFaultHandler;
import org.apache.kafka.server.util.FileLock;
@@ -176,10 +178,13 @@ public final class MetadataShell {
private void initializeWithSnapshotFileReader() throws Exception {
this.fileLock = takeDirectoryLockIfExists(parentParent(new
File(snapshotPath)));
+ SupportedConfigChecker supportedConfigChecker = new
DefaultSupportedConfigChecker();
+
this.loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setNodeId(-1).
setHighWaterMarkAccessor(() ->
snapshotFileReader.highWaterMark()).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
snapshotFileReader = new SnapshotFileReader(snapshotPath, loader);
snapshotFileReader.startup();