This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 4e823a07a3e KAFKA-19851; Delete dynamic configs that were removed by 
Kafka  (#21053)
4e823a07a3e is described below

commit 4e823a07a3ef366768e967148a11b39c3759d318
Author: Zhiyan Tang <[email protected]>
AuthorDate: Mon Mar 30 11:00:20 2026 -0500

    KAFKA-19851; Delete dynamic configs that were removed by Kafka  (#21053)
    
    When upgrading from Kafka 3.x to 4.0, the metadata log may contain
    dynamic configurations that were removed in 4.0 (e.g.,
    message.format.version per KIP-724). These removed configs cause
    InvalidConfigurationException when users attempt to modify any
    configuration, because validation checks all existing configs including
    the removed ones.
    
    Adds filtering to prevent unsupported or invalid configurations from
    being applied during metadata replay. The filtering is implemented using
    a SupportedConfigChecker interface that is injected via dependency
    injection through Builder patterns. When a ConfigRecord is replayed, the
    checker validates whether the configuration name is supported for the
    given resource type. Unsupported configurations are silently ignored
    during replay, ensuring that only valid configurations enter the
    in-memory state.
    
    The SupportedConfigChecker interface provides a default TRUE
    implementation that accepts all configurations. The actual filtering
    logic is implemented by DefaultSupportedConfigChecker, which maintains a
    whitelist of valid configuration names per resource type (TOPIC,
    CLIENT_METRICS, GROUP) based on the actual config definitions. The
    filtering occurs in both ConfigurationDelta#replay and
    ConfigurationControlManager#replay methods.
    
    Added unit tests to ensure:
    - Removed configurations are filtered during the replay operations
    - Only supported configurations appear in the resulting metadata images
    - The filtering works correctly for all resource types (TOPIC, BROKER,
    CLIENT_METRICS, GROUP)
    - DefaultSupportedConfigChecker correctly identifies supported vs
    unsupported configurations for each resource type
    
    Reviewers: José Armando García Sancio <[email protected]>, Jun Rao
    <[email protected]>, Alyssa Huang <[email protected]>, Kevin Wu
    <[email protected]>, Andrew Grant <[email protected]>
---
 build.gradle                                       |  1 +
 checkstyle/import-control.xml                      |  1 +
 .../runtime/KRaftCoordinatorMetadataImage.java     |  8 ++-
 .../common/runtime/CoordinatorRuntimeTest.java     |  6 +-
 .../runtime/KRaftCoordinatorMetadataDeltaTest.java | 12 +++-
 .../common/runtime/MetadataImageBuilder.java       |  4 +-
 .../main/scala/kafka/server/ControllerServer.scala |  1 +
 .../src/main/scala/kafka/server/SharedServer.scala |  8 ++-
 .../kafka/server/LocalLeaderEndPointTest.scala     |  8 ++-
 .../server/DefaultApiVersionManagerTest.scala      |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 28 ++++++--
 .../metadata/BrokerMetadataPublisherTest.scala     | 12 +++-
 .../group/GroupCoordinatorServiceTest.java         | 52 +++++++++++----
 .../group/GroupMetadataManagerTest.java            | 32 ++++++---
 .../kafka/jmh/assignor/AssignorBenchmarkUtils.java |  4 +-
 .../jmh/coordinator/RegexResolutionBenchmark.java  |  4 +-
 .../TransactionalOffsetFetchBenchmark.java         |  4 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |  4 +-
 .../controller/ConfigurationControlManager.java    | 22 ++++++-
 .../apache/kafka/controller/QuorumController.java  | 10 +++
 .../org/apache/kafka/image/ConfigurationDelta.java | 13 +++-
 .../apache/kafka/image/ConfigurationsDelta.java    | 11 ++--
 .../java/org/apache/kafka/image/MetadataDelta.java | 16 ++++-
 .../kafka/image/loader/MetadataBatchLoader.java    | 16 ++++-
 .../apache/kafka/image/loader/MetadataLoader.java  | 24 ++++++-
 .../kafka/metadata/SupportedConfigChecker.java     | 40 +++++++++++
 .../ConfigurationControlManagerTest.java           | 60 +++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     |  3 +-
 .../ControllerMetadataMetricsPublisherTest.java    |  4 +-
 .../kafka/image/ConfigurationsImageTest.java       | 45 ++++++++++++-
 .../org/apache/kafka/image/MetadataImageTest.java  |  2 +-
 .../image/loader/MetadataBatchLoaderTest.java      | 67 +++++++++++++++++--
 .../kafka/image/loader/MetadataLoaderTest.java     | 75 +++++++++++++++++++++
 .../config/DefaultSupportedConfigChecker.java      | 77 ++++++++++++++++++++++
 .../config/DefaultSupportedConfigCheckerTest.java  | 73 ++++++++++++++++++++
 .../share/ShareCoordinatorServiceTest.java         | 12 +++-
 .../java/org/apache/kafka/shell/MetadataShell.java |  5 ++
 37 files changed, 690 insertions(+), 78 deletions(-)

diff --git a/build.gradle b/build.gradle
index 6268236cda2..5c9d580f8be 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2655,6 +2655,7 @@ project(':shell') {
     implementation project(':core')
     implementation project(':metadata')
     implementation project(':raft')
+    implementation project(':server')
 
     implementation libs.jose4j                    // for SASL/OAUTHBEARER JWT 
validation
     implementation libs.jacksonJakartarsJsonProvider
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 55efa78bcb6..f8ff30c3eb9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -275,6 +275,7 @@
     <allow pkg="org.apache.kafka.queue"/>
     <allow pkg="org.apache.kafka.raft"/>
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.config" />
     <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.server.util" />
     <allow pkg="org.apache.kafka.shell"/>
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
index 8df13da1384..1d7163362a3 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java
@@ -76,7 +76,13 @@ public class KRaftCoordinatorMetadataImage implements 
CoordinatorMetadataImage {
 
     @Override
     public CoordinatorMetadataDelta emptyDelta() {
-        return new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(metadataImage));
+        // Note: supportedConfigChecker is not set because 
CoordinatorMetadataDelta only exposes topic-related methods.
+        // No ConfigRecord replay happens through this path, so the checker is 
never invoked.
+        return new KRaftCoordinatorMetadataDelta(
+            new MetadataDelta.Builder()
+                .setImage(metadataImage)
+                .build()
+        );
     }
 
     @Override
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 95872130a76..f9bec9d3ad8 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -2229,7 +2229,11 @@ public class CoordinatorRuntimeTest {
         verify(coordinator0).onLoaded(CoordinatorMetadataImage.EMPTY);
 
         // Publish a new image.
-        CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(MetadataImage.EMPTY));
+        CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(
+            new MetadataDelta.Builder()
+                .setImage(MetadataImage.EMPTY)
+                .build()
+        );
         CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY;
         runtime.onMetadataUpdate(delta, newImage);
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
index e72170dc0f8..c4f6eaa05d3 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java
@@ -57,7 +57,9 @@ public class KRaftCoordinatorMetadataDeltaTest {
             .addTopic(deletedTopicId, deletedTopicName, 1)
             .addTopic(changedTopicId, changedTopicName, 1)
             .build();
-        MetadataDelta delta = new MetadataDelta(image);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
         delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
         delta.replay(new 
TopicRecord().setTopicId(topicId2).setName(topicName2));
         delta.replay(new RemoveTopicRecord().setTopicId(deletedTopicId));
@@ -107,14 +109,18 @@ public class KRaftCoordinatorMetadataDeltaTest {
         Uuid topicId3 = Uuid.randomUuid();
         String topicName3 = "test-topic3";
 
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
         delta.replay(new 
TopicRecord().setTopicId(topicId2).setName(topicName2));
 
         KRaftCoordinatorMetadataDelta coordinatorDelta = new 
KRaftCoordinatorMetadataDelta(delta);
         KRaftCoordinatorMetadataDelta coordinatorDeltaCopy = new 
KRaftCoordinatorMetadataDelta(delta);
 
-        MetadataDelta delta2 = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta2 = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         delta.replay(new 
TopicRecord().setTopicId(topicId3).setName(topicName3));
         KRaftCoordinatorMetadataDelta coordinatorDelta2 = new 
KRaftCoordinatorMetadataDelta(delta2);
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java
index 142915a69e6..e2eacf9f537 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java
@@ -34,7 +34,9 @@ public class MetadataImageBuilder {
     }
 
     public MetadataImageBuilder(MetadataImage image) {
-        this.delta = new MetadataDelta(image);
+        this.delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
     }
 
     public MetadataImageBuilder addTopic(
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7b9291ce165..e6a7da8b6cb 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -248,6 +248,7 @@ class ControllerServer(
           setCreateTopicPolicy(createTopicPolicy.toJava).
           setAlterConfigPolicy(alterConfigPolicy.toJava).
           setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).
+          setSupportedConfigChecker(sharedServer.supportedConfigChecker).
           setStaticConfig(config.originals).
           setBootstrapMetadata(bootstrapMetadata).
           setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 4b55d300a02..9e6e2231011 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
 import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
 import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
 import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
-import org.apache.kafka.metadata.ListenerInfo
-import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo, 
MetadataRecordSerde}
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
 import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics}
 import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, 
ProcessTerminatingFaultHandler}
 import org.apache.kafka.server.metrics.{BrokerServerMetrics, 
KafkaYammerMetrics, NodeMetrics}
@@ -112,6 +112,7 @@ class SharedServer(
   private var usedByController: Boolean = false
   val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
   val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+  val supportedConfigChecker: SupportedConfigChecker = new 
DefaultSupportedConfigChecker()
   
   // Factory for creating request handler pools with shared aggregate thread 
counter
   val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
@@ -326,7 +327,8 @@ class SharedServer(
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
           setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
-          setMetrics(metadataLoaderMetrics)
+          setMetrics(metadataLoaderMetrics).
+          setSupportedConfigChecker(supportedConfigChecker)
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(nodeId).
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index b25bca7357a..e697284cdb4 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -89,7 +89,9 @@ class LocalLeaderEndPointTest extends Logging {
       alterPartitionManager = alterPartitionManager
     )
 
-    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val delta = new MetadataDelta.Builder()
+      .setImage(MetadataImage.EMPTY)
+      .build()
     delta.replay(new FeatureLevelRecord()
       .setName(MetadataVersion.FEATURE_NAME)
       .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -410,7 +412,9 @@ class LocalLeaderEndPointTest extends Logging {
   }
 
   private def bumpLeaderEpoch(): Unit = {
-    val delta = new MetadataDelta(image)
+    val delta = new MetadataDelta.Builder()
+      .setImage(image)
+      .build()
     delta.replay(new PartitionChangeRecord()
       .setTopicId(topicId)
       .setPartitionId(partition)
diff --git 
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index b524413b656..6e5c5bd8d1e 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -38,7 +38,9 @@ class DefaultApiVersionManagerTest {
   private val brokerFeatures = BrokerFeatures.createDefault(true)
   private val metadataCache = {
     val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
-    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val delta = new MetadataDelta.Builder()
+      .setImage(MetadataImage.EMPTY)
+      .build()
     delta.replay(new FeatureLevelRecord()
       .setName(MetadataVersion.FEATURE_NAME)
       .setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index aaf5409e5f3..b3b4a147f3f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -230,7 +230,9 @@ class KafkaApisTest extends Logging {
 
   def initializeMetadataCacheWithShareGroupsEnabled(enableShareGroups: Boolean 
= true): MetadataCache = {
     val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val delta = new MetadataDelta.Builder()
+      .setImage(MetadataImage.EMPTY)
+      .build()
     delta.replay(new FeatureLevelRecord()
       .setName(MetadataVersion.FEATURE_NAME)
       .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -7040,7 +7042,9 @@ class KafkaApisTest extends Logging {
 
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -7268,7 +7272,9 @@ class KafkaApisTest extends Logging {
 
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10860,7 +10866,9 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10994,7 +11002,9 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -11557,7 +11567,9 @@ class KafkaApisTest extends Logging {
     expectedResponse.groups.add(expectedDescribedGroup)
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -11720,7 +11732,9 @@ class KafkaApisTest extends Logging {
     expectedResponse.groups.add(expectedDescribedGroup)
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val delta = new MetadataDelta.Builder()
+        .setImage(MetadataImage.EMPTY)
+        .build()
       delta.replay(new FeatureLevelRecord()
         .setName(MetadataVersion.FEATURE_NAME)
         .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index ba7445de0cb..8d192e9c80e 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -215,7 +215,9 @@ class BrokerMetadataPublisherTest {
     )
 
     val topicId = Uuid.randomUuid()
-    var delta = new MetadataDelta(MetadataImage.EMPTY)
+    var delta = new MetadataDelta.Builder()
+      .setImage(MetadataImage.EMPTY)
+      .build()
     delta.replay(new TopicRecord()
       .setName(Topic.GROUP_METADATA_TOPIC_NAME)
       .setTopicId(topicId)
@@ -232,7 +234,9 @@ class BrokerMetadataPublisherTest {
     )
     val image = delta.apply(MetadataProvenance.EMPTY)
 
-    delta = new MetadataDelta(image)
+    delta = new MetadataDelta.Builder()
+      .setImage(image)
+      .build()
     delta.replay(new RemoveTopicRecord()
       .setTopicId(topicId)
     )
@@ -340,7 +344,9 @@ class BrokerMetadataPublisherTest {
     )
 
     // Share version 1 is getting passed to features delta.
-    val delta = new MetadataDelta(image)
+    val delta = new MetadataDelta.Builder()
+      .setImage(image)
+      .build()
     delta.replay(new 
FeatureLevelRecord().setName(ShareVersion.FEATURE_NAME).setFeatureLevel(1))
 
     metadataPublisher.onMetadataUpdate(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 200e604dbd0..76fdce5568e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -3121,7 +3121,9 @@ public class GroupCoordinatorServiceTest {
         var image = new MetadataImageBuilder()
             .addTopic(Uuid.randomUuid(), "foo", 1)
             .build();
-        var delta = new MetadataDelta(image);
+        var delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
 
         assertThrows(CoordinatorNotAvailableException.class,
             () -> service.onMetadataUpdate(delta, image));
@@ -3142,7 +3144,9 @@ public class GroupCoordinatorServiceTest {
             .build();
 
         // Create a delta that deletes the topic.
-        var delta = new MetadataDelta(initialImage);
+        var delta = new MetadataDelta.Builder()
+            .setImage(initialImage)
+            .build();
         delta.replay(new RemoveTopicRecord().setTopicId(topicId));
         var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
 
@@ -3203,7 +3207,9 @@ public class GroupCoordinatorServiceTest {
         var image = new MetadataImageBuilder()
             .addTopic(Uuid.randomUuid(), "foo", 1)
             .build();
-        var delta = new MetadataDelta(image);
+        var delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
 
         assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
 
@@ -3233,7 +3239,9 @@ public class GroupCoordinatorServiceTest {
             .build();
 
         // Create a delta that deletes the topic.
-        var delta = new MetadataDelta(initialImage);
+        var delta = new MetadataDelta.Builder()
+            .setImage(initialImage)
+            .build();
         delta.replay(new RemoveTopicRecord().setTopicId(topicId));
         var newImage = delta.apply(new MetadataProvenance(1, 0, 0L, true));
 
@@ -4057,7 +4065,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         int partition = 1;
 
@@ -4183,7 +4193,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         int partition = 1;
 
@@ -4254,7 +4266,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         int partition = 1;
 
@@ -4290,7 +4304,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         int partition = 1;
 
@@ -4327,7 +4343,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         int partition = 1;
 
@@ -5469,7 +5487,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -5634,7 +5654,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 3)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -5845,7 +5867,9 @@ public class GroupCoordinatorServiceTest {
             .addTopic(topicId, "topic-name", 1)
             .build();
 
-        service.onMetadataUpdate(new MetadataDelta(image), image);
+        service.onMetadataUpdate(new MetadataDelta.Builder()
+            .setImage(image)
+            .build(), image);
 
         
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
             new InitializeShareGroupStateResult.Builder()
@@ -5914,7 +5938,9 @@ public class GroupCoordinatorServiceTest {
 
             if (serviceStartup) {
                 service.startup(() -> 1);
-                service.onMetadataUpdate(new MetadataDelta(metadataImage), 
metadataImage);
+                service.onMetadataUpdate(new MetadataDelta.Builder()
+                    .setImage(metadataImage)
+                    .build(), metadataImage);
             }
 
             return service;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 998c597c850..ab1807a3cbc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1646,12 +1646,16 @@ public class GroupMetadataManagerTest {
         ));
 
         // Remove foo topic from metadata image.
-        MetadataDelta delta = new MetadataDelta(metadataImage);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(metadataImage)
+            .build();
         delta.replay(new RemoveTopicRecord().setTopicId(fooTopicId));
         MetadataImage newMetadataImage = delta.apply(MetadataProvenance.EMPTY);
 
         context.groupMetadataManager.onMetadataUpdate(
-            new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(newMetadataImage)), new 
KRaftCoordinatorMetadataImage(newMetadataImage)
+            new KRaftCoordinatorMetadataDelta(new MetadataDelta.Builder()
+                .setImage(newMetadataImage)
+                .build()), new KRaftCoordinatorMetadataImage(newMetadataImage)
         );
         // If a topic is removed, related topic hash is cleanup.
         assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -4161,7 +4165,9 @@ public class GroupMetadataManagerTest {
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
MockPartitionAssignor("range")))
             .build();
 
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
 
         context.groupMetadataManager.onMetadataUpdate(new 
KRaftCoordinatorMetadataDelta(delta), new KRaftCoordinatorMetadataImage(image));
@@ -4218,7 +4224,9 @@ public class GroupMetadataManagerTest {
         Uuid topicE = Uuid.randomUuid();
 
         // Create a first base image with topic a, b, c and d.
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
         delta.replay(new 
PartitionRecord().setTopicId(topicA).setPartitionId(0));
         delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
@@ -4230,7 +4238,9 @@ public class GroupMetadataManagerTest {
         MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
 
         // Create a delta which updates topic B, deletes topic D and creates 
topic E.
-        delta = new MetadataDelta(image);
+        delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
         delta.replay(new 
PartitionRecord().setTopicId(topicB).setPartitionId(2));
         delta.replay(new RemoveTopicRecord().setTopicId(topicD));
         delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
@@ -20374,7 +20384,9 @@ public class GroupMetadataManagerTest {
         Uuid topicE = Uuid.randomUuid();
 
         // Create a first base image with topic a, b, c and d.
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
         delta.replay(new 
PartitionRecord().setTopicId(topicA).setPartitionId(0));
         delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
@@ -20386,7 +20398,9 @@ public class GroupMetadataManagerTest {
         MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
 
         // Create a delta which updates topic B, deletes topic D and creates 
topic E.
-        delta = new MetadataDelta(image);
+        delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
         delta.replay(new 
PartitionRecord().setTopicId(topicB).setPartitionId(2));
         delta.replay(new RemoveTopicRecord().setTopicId(topicD));
         delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
@@ -23424,7 +23438,9 @@ public class GroupMetadataManagerTest {
             .build(2L);
 
         context.groupMetadataManager.onMetadataUpdate(
-            new KRaftCoordinatorMetadataDelta(new MetadataDelta(newImage)), 
new KRaftCoordinatorMetadataImage(newImage)
+            new KRaftCoordinatorMetadataDelta(new MetadataDelta.Builder()
+                .setImage(newImage)
+                .build()), new KRaftCoordinatorMetadataImage(newImage)
         );
 
         // A member heartbeats.
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index a4bcb88d227..747156f8851 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -100,7 +100,9 @@ public class AssignorBenchmarkUtils {
         List<String> allTopicNames,
         int partitionsPerTopic
     ) {
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
 
         for (String topicName : allTopicNames) {
             AssignorBenchmarkUtils.addTopic(
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index 289835cd1cd..516424fa656 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -96,7 +96,9 @@ public class RegexResolutionBenchmark {
     public void setup() {
         Random random = new Random();
 
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         for (int i = 0; i < topicCount; i++) {
             String topicName =
                 WORDS.get(random.nextInt(WORDS.size())) + "_" +
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index 8149dfedf2a..a7137b49276 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -83,7 +83,9 @@ public class TransactionalOffsetFetchBenchmark {
     @Setup(Level.Trial)
     public void setup() {
         LogContext logContext = new LogContext();
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         delta.replay(new TopicRecord()
             .setTopicId(Uuid.randomUuid())
             .setName(TOPIC_NAME));
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 5b0cf9de898..116975a8d6e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -138,7 +138,9 @@ public class KRaftMetadataRequestBenchmark {
     }
 
     private void initializeMetadataCache() {
-        MetadataDelta buildupMetadataDelta = new 
MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta buildupMetadataDelta = new MetadataDelta.Builder()
+            .setImage(MetadataImage.EMPTY)
+            .build();
         IntStream.range(0, 5).forEach(brokerId -> {
             RegisterBrokerRecord.BrokerEndpointCollection endpoints = new 
RegisterBrokerRecord.BrokerEndpointCollection();
             endpoints.addAll(endpoints(brokerId));
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 31665883e3c..013a04c3bac 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -78,6 +79,7 @@ public class ConfigurationControlManager {
     private final Map<String, Object> staticConfig;
     private final ConfigResource currentController;
     private final FeatureControlManager featureControl;
+    private final SupportedConfigChecker supportedConfigChecker;
 
     static class Builder {
         private LogContext logContext = null;
@@ -89,6 +91,7 @@ public class ConfigurationControlManager {
         private Map<String, Object> staticConfig = Map.of();
         private int nodeId = 0;
         private FeatureControlManager featureControl = null;
+        private SupportedConfigChecker supportedConfigChecker = 
SupportedConfigChecker.TRUE;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -135,6 +138,11 @@ public class ConfigurationControlManager {
             return this;
         }
 
+        Builder setSupportedConfigChecker(SupportedConfigChecker 
supportedConfigChecker) {
+            this.supportedConfigChecker = supportedConfigChecker;
+            return this;
+        }
+
         ConfigurationControlManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
@@ -153,7 +161,8 @@ public class ConfigurationControlManager {
                 validator,
                 staticConfig,
                 nodeId,
-                featureControl);
+                featureControl,
+                supportedConfigChecker);
         }
     }
 
@@ -165,7 +174,8 @@ public class ConfigurationControlManager {
             ConfigurationValidator validator,
             Map<String, Object> staticConfig,
             int nodeId,
-            FeatureControlManager featureControl
+            FeatureControlManager featureControl,
+            SupportedConfigChecker supportedConfigChecker
     ) {
         this.log = logContext.logger(ConfigurationControlManager.class);
         this.snapshotRegistry = snapshotRegistry;
@@ -178,6 +188,7 @@ public class ConfigurationControlManager {
         this.staticConfig = Map.copyOf(staticConfig);
         this.currentController = new ConfigResource(Type.BROKER, 
Integer.toString(nodeId));
         this.featureControl = featureControl;
+        this.supportedConfigChecker = supportedConfigChecker;
     }
 
     SnapshotRegistry snapshotRegistry() {
@@ -527,6 +538,13 @@ public class ConfigurationControlManager {
     public void replay(ConfigRecord record) {
         Type type = Type.forId(record.resourceType());
         ConfigResource configResource = new ConfigResource(type, 
record.resourceName());
+
+        if (!supportedConfigChecker.isSupported(configResource.type(), 
record.name())) {
+            // We skip unsupported configs during replay. This can happen when 
the config was
+            // deprecated and removed, but old records still exist in the log.
+            log.info("Skipping unsupported config {} for resource {} during 
replay", record.name(), configResource);
+            return;
+        }
         TimelineHashMap<String, String> configs = 
configData.get(configResource);
         if (configs == null) {
             configs = new TimelineHashMap<>(snapshotRegistry, 0);
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 16f3438b94a..0f827d8551d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -100,6 +100,7 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.placement.ReplicaPlacer;
@@ -209,6 +210,7 @@ public final class QuorumController implements Controller {
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
         private Optional<AlterConfigPolicy> alterConfigPolicy = 
Optional.empty();
         private ConfigurationValidator configurationValidator = 
ConfigurationValidator.NO_OP;
+        private SupportedConfigChecker supportedConfigChecker = 
SupportedConfigChecker.TRUE;
         private Map<String, Object> staticConfig = Map.of();
         private BootstrapMetadata bootstrapMetadata = null;
         private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
@@ -345,6 +347,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setSupportedConfigChecker(SupportedConfigChecker 
supportedConfigChecker) {
+            this.supportedConfigChecker = supportedConfigChecker;
+            return this;
+        }
+
         public Builder setStaticConfig(Map<String, Object> staticConfig) {
             this.staticConfig = staticConfig;
             return this;
@@ -436,6 +443,7 @@ public final class QuorumController implements Controller {
                     createTopicPolicy,
                     alterConfigPolicy,
                     configurationValidator,
+                    supportedConfigChecker,
                     staticConfig,
                     bootstrapMetadata,
                     maxRecordsPerBatch,
@@ -1491,6 +1499,7 @@ public final class QuorumController implements Controller 
{
         Optional<CreateTopicPolicy> createTopicPolicy,
         Optional<AlterConfigPolicy> alterConfigPolicy,
         ConfigurationValidator configurationValidator,
+        SupportedConfigChecker supportedConfigChecker,
         Map<String, Object> staticConfig,
         BootstrapMetadata bootstrapMetadata,
         int maxRecordsPerBatch,
@@ -1553,6 +1562,7 @@ public final class QuorumController implements Controller 
{
             setStaticConfig(staticConfig).
             setNodeId(nodeId).
             setFeatureControl(featureControl).
+            setSupportedConfigChecker(supportedConfigChecker).
             build();
         this.producerIdControlManager = new ProducerIdControlManager.Builder().
             setLogContext(logContext).
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index dc550d8c72a..63e6c2b270c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -17,7 +17,9 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,11 +31,14 @@ import java.util.Optional;
  * Represents changes to the configurations in the metadata image.
  */
 public final class ConfigurationDelta {
+
     private final ConfigurationImage image;
     private final Map<String, Optional<String>> changes = new HashMap<>();
+    private final SupportedConfigChecker supportedConfigChecker;
 
-    public ConfigurationDelta(ConfigurationImage image) {
+    public ConfigurationDelta(ConfigurationImage image, SupportedConfigChecker 
supportedConfigChecker) {
         this.image = image;
+        this.supportedConfigChecker = supportedConfigChecker;
     }
 
     public void finishSnapshot() {
@@ -45,6 +50,12 @@ public final class ConfigurationDelta {
     }
 
     public void replay(ConfigRecord record) {
+        ConfigResource.Type type = 
ConfigResource.Type.forId(record.resourceType());
+        if (!supportedConfigChecker.isSupported(type, record.name())) {
+            // We skip unsupported configs during replay. This can happen when 
the config was
+            // deprecated and removed, but old records still exist in the log.
+            return;
+        }
         changes.put(record.name(), Optional.ofNullable(record.value()));
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index 0b3fcbb3867..4d50abbbb2f 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
@@ -34,9 +35,11 @@ import java.util.Map.Entry;
 public final class ConfigurationsDelta {
     private final ConfigurationsImage image;
     private final Map<ConfigResource, ConfigurationDelta> changes = new 
HashMap<>();
+    private final SupportedConfigChecker supportedConfigChecker;
 
-    public ConfigurationsDelta(ConfigurationsImage image) {
+    public ConfigurationsDelta(ConfigurationsImage image, 
SupportedConfigChecker supportedConfigChecker) {
         this.image = image;
+        this.supportedConfigChecker = supportedConfigChecker;
     }
 
     public Map<ConfigResource, ConfigurationDelta> changes() {
@@ -48,7 +51,7 @@ public final class ConfigurationsDelta {
             ConfigResource resource = entry.getKey();
             ConfigurationImage configImage = entry.getValue();
             ConfigurationDelta configDelta = changes.computeIfAbsent(resource,
-                __ -> new ConfigurationDelta(configImage));
+                __ -> new ConfigurationDelta(configImage, 
supportedConfigChecker));
             configDelta.finishSnapshot();
         }
     }
@@ -63,7 +66,7 @@ public final class ConfigurationsDelta {
         ConfigurationImage configImage = 
image.resourceData().getOrDefault(resource,
                 new ConfigurationImage(resource, Map.of()));
         ConfigurationDelta delta = changes.computeIfAbsent(resource,
-            __ -> new ConfigurationDelta(configImage));
+            __ -> new ConfigurationDelta(configImage, supportedConfigChecker));
         delta.replay(record);
     }
 
@@ -73,7 +76,7 @@ public final class ConfigurationsDelta {
         if (image.resourceData().containsKey(resource)) {
             ConfigurationImage configImage = 
image.resourceData().get(resource);
             ConfigurationDelta delta = changes.computeIfAbsent(resource,
-                __ -> new ConfigurationDelta(configImage));
+                __ -> new ConfigurationDelta(configImage, 
supportedConfigChecker));
             delta.deleteAll();
         }
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index b934d10f6d1..36d68dff5a2 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.metadata.UserScramCredentialRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Optional;
@@ -51,19 +52,27 @@ import java.util.Optional;
 public final class MetadataDelta {
     public static class Builder {
         private MetadataImage image = MetadataImage.EMPTY;
+        private SupportedConfigChecker supportedConfigChecker = 
SupportedConfigChecker.TRUE;
 
         public Builder setImage(MetadataImage image) {
             this.image = image;
             return this;
         }
 
+        public Builder setSupportedConfigChecker(SupportedConfigChecker 
supportedConfigChecker) {
+            this.supportedConfigChecker = supportedConfigChecker;
+            return this;
+        }
+
         public MetadataDelta build() {
-            return new MetadataDelta(image);
+            return new MetadataDelta(image, supportedConfigChecker);
         }
     }
 
     private final MetadataImage image;
 
+    private final SupportedConfigChecker supportedConfigChecker;
+
     private FeaturesDelta featuresDelta = null;
 
     private ClusterDelta clusterDelta = null;
@@ -82,8 +91,9 @@ public final class MetadataDelta {
 
     private DelegationTokenDelta delegationTokenDelta = null;
 
-    public MetadataDelta(MetadataImage image) {
+    private MetadataDelta(MetadataImage image, SupportedConfigChecker 
supportedConfigChecker) {
         this.image = image;
+        this.supportedConfigChecker = supportedConfigChecker;
     }
 
     public MetadataImage image() {
@@ -122,7 +132,7 @@ public final class MetadataDelta {
     }
 
     public ConfigurationsDelta getOrCreateConfigsDelta() {
-        if (configsDelta == null) configsDelta = new 
ConfigurationsDelta(image.configs());
+        if (configsDelta == null) configsDelta = new 
ConfigurationsDelta(image.configs(), supportedConfigChecker);
         return configsDelta;
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index d55078643f2..0f0cc84ff57 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -58,6 +59,7 @@ public class MetadataBatchLoader {
     private final Time time;
     private final FaultHandler faultHandler;
     private final MetadataUpdater callback;
+    private final SupportedConfigChecker supportedConfigChecker;
 
     private MetadataImage image;
     private MetadataDelta delta;
@@ -74,12 +76,14 @@ public class MetadataBatchLoader {
         LogContext logContext,
         Time time,
         FaultHandler faultHandler,
-        MetadataUpdater callback
+        MetadataUpdater callback,
+        SupportedConfigChecker supportedConfigChecker
     ) {
         this.log = logContext.logger(MetadataBatchLoader.class);
         this.time = time;
         this.faultHandler = faultHandler;
         this.callback = callback;
+        this.supportedConfigChecker = supportedConfigChecker;
         this.resetToImage(MetadataImage.EMPTY);
         this.hasSeenRecord = false;
     }
@@ -101,7 +105,10 @@ public class MetadataBatchLoader {
     public final void resetToImage(MetadataImage image) {
         this.image = image;
         this.hasSeenRecord = !image.isEmpty();
-        this.delta = new MetadataDelta.Builder().setImage(image).build();
+        this.delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .setSupportedConfigChecker(supportedConfigChecker)
+            .build();
         this.transactionState = TransactionState.NO_TRANSACTION;
         this.lastOffset = image.provenance().lastContainedOffset();
         this.lastEpoch = image.provenance().lastContainedEpoch();
@@ -199,7 +206,10 @@ public class MetadataBatchLoader {
                 log.debug("handleCommit: publishing empty delta between {} and 
{} from {} batch(es) " +
                     "since a transaction was aborted", image.offset(), 
manifest.provenance().lastContainedOffset(),
                     manifest.numBatches());
-                applyDeltaAndUpdate(new 
MetadataDelta.Builder().setImage(image).build(), manifest);
+                applyDeltaAndUpdate(new MetadataDelta.Builder()
+                    .setImage(image)
+                    .setSupportedConfigChecker(supportedConfigChecker)
+                    .build(), manifest);
                 break;
             case ENDED_TRANSACTION:
             case NO_TRANSACTION:
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 19235e578e3..bfd42e2bdc7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.image.writer.ImageReWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.Batch;
@@ -81,6 +82,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         private FaultHandler faultHandler = FaultHandlerException::new;
         private MetadataLoaderMetrics metrics = null;
         private Supplier<OptionalLong> highWaterMarkAccessor = null;
+        private SupportedConfigChecker supportedConfigChecker = 
SupportedConfigChecker.TRUE;
 
         public Builder setNodeId(int nodeId) {
             this.nodeId = nodeId;
@@ -112,6 +114,11 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             return this;
         }
 
+        public Builder setSupportedConfigChecker(SupportedConfigChecker 
supportedConfigChecker) {
+            this.supportedConfigChecker = supportedConfigChecker;
+            return this;
+        }
+
         public MetadataLoader build() {
             if (logContext == null) {
                 logContext = new LogContext("[MetadataLoader id=" + nodeId + 
"] ");
@@ -132,7 +139,8 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                 threadNamePrefix,
                 faultHandler,
                 metrics,
-                highWaterMarkAccessor);
+                highWaterMarkAccessor,
+                supportedConfigChecker);
         }
     }
 
@@ -196,19 +204,26 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
      */
     private final KafkaEventQueue eventQueue;
 
+    /**
+     * Config checker for filtering unsupported configurations.
+     */
+    private final SupportedConfigChecker supportedConfigChecker;
+
     private MetadataLoader(
         Time time,
         LogContext logContext,
         String threadNamePrefix,
         FaultHandler faultHandler,
         MetadataLoaderMetrics metrics,
-        Supplier<OptionalLong> highWaterMarkAccessor
+        Supplier<OptionalLong> highWaterMarkAccessor,
+        SupportedConfigChecker supportedConfigChecker
     ) {
         this.log = logContext.logger(MetadataLoader.class);
         this.time = time;
         this.faultHandler = faultHandler;
         this.metrics = metrics;
         this.highWaterMarkAccessor = highWaterMarkAccessor;
+        this.supportedConfigChecker = supportedConfigChecker;
         this.uninitializedPublishers = new LinkedHashMap<>();
         this.publishers = new LinkedHashMap<>();
         this.image = MetadataImage.EMPTY;
@@ -216,7 +231,8 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             logContext,
             time,
             faultHandler,
-            this::maybePublishMetadata);
+            this::maybePublishMetadata,
+            supportedConfigChecker);
         this.eventQueue = new KafkaEventQueue(
             time,
             logContext,
@@ -296,6 +312,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         // haven't seen anything previously.
         MetadataDelta delta = new MetadataDelta.Builder().
                 setImage(MetadataImage.EMPTY).
+                setSupportedConfigChecker(supportedConfigChecker).
                 build();
         ImageReWriter writer = new ImageReWriter(delta);
         image.write(writer, new 
ImageWriterOptions.Builder(image.features().metadataVersionOrThrow()).
@@ -406,6 +423,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     snapshotName, numLoaded);
                 MetadataDelta delta = new MetadataDelta.Builder().
                     setImage(image).
+                    setSupportedConfigChecker(supportedConfigChecker).
                     build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
                 log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java 
b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
new file mode 100644
index 00000000000..df9bf984d7d
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+/**
+ * Interface for checking if a configuration name is supported for a given 
resource type.
+ */
+@FunctionalInterface
+public interface SupportedConfigChecker {
+    /**
+     * Check if a configuration name is supported for the given resource type.
+     *
+     * @param resourceType the type of resource (broker, topic, user, etc.)
+     * @param configName   the name of the configuration
+     * @return true if the configuration is supported for the resource type, 
false otherwise
+     */
+    boolean isSupported(ConfigResource.Type resourceType, String configName);
+
+    /**
+     * A SupportedConfigChecker that always returns true, accepting all 
configurations.
+     */
+    SupportedConfigChecker TRUE = (resourceType, configName) -> true;
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 1a1d5b74c41..03173552e90 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -586,4 +587,63 @@ public class ConfigurationControlManagerTest {
             setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
         return featureControlManager;
     }
+
+    @Test
+    public void testValidateAlterConfigWithInvalidExistingConfigs() {
+        Set<String> validConfigs = Set.of("abc", "def");
+        SupportedConfigChecker supportedConfigChecker = (resourceType, 
configName) -> validConfigs.contains(configName);
+
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
+            setKafkaConfigSchema(SCHEMA).
+            setSupportedConfigChecker(supportedConfigChecker).
+            build();
+
+        manager.replay(new ConfigRecord().
+            setResourceType(TOPIC.id()).setResourceName("mytopic").
+            setName("abc").setValue("value1"));  // valid
+        manager.replay(new ConfigRecord().
+            setResourceType(TOPIC.id()).setResourceName("mytopic").
+            setName("invalid.config").setValue("should-be-filtered"));  // 
invalid, filtered in replay()
+
+        Map<String, String> configs = manager.getConfigs(MYTOPIC);
+        assertTrue(configs.containsKey("abc"), "Valid config should be in 
configData");
+        assertFalse(configs.containsKey("invalid.config"), "Invalid config 
should be filtered out in replay()");
+
+        ControllerResult<ApiError> result = manager.incrementalAlterConfig(
+            MYTOPIC,
+            toMap(entry("def", entry(SET, "newValue"))),
+            false);
+
+        assertEquals(ApiError.NONE, result.response());
+    }
+
+    @Test
+    public void testReplayFiltersInvalidConfigs() {
+        Set<String> validConfigs = Set.of("abc", "def", "ghi");
+        SupportedConfigChecker supportedConfigChecker = (resourceType, 
configName) -> validConfigs.contains(configName);
+
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setKafkaConfigSchema(SCHEMA).
+            setSupportedConfigChecker(supportedConfigChecker).
+            build();
+
+        // Replay valid configs
+        manager.replay(new ConfigRecord().
+            setResourceType(TOPIC.id()).setResourceName("mytopic").
+            setName("abc").setValue("value1"));
+        manager.replay(new ConfigRecord().
+            setResourceType(TOPIC.id()).setResourceName("mytopic").
+            setName("def").setValue("value2"));
+
+        manager.replay(new ConfigRecord().
+            setResourceType(TOPIC.id()).setResourceName("mytopic").
+            setName("invalid.config").setValue("should-be-filtered"));
+
+        Map<String, String> configs = manager.getConfigs(MYTOPIC);
+        assertEquals(2, configs.size(), "Should only have valid configs");
+        assertTrue(configs.containsKey("abc"));
+        assertTrue(configs.containsKey("def"));
+        assertFalse(configs.containsKey("invalid.config"), "Invalid config 
should not be in configData");
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 54e68da302a..6a8618708d8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,6 +97,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
 import 
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.raft.Batch;
@@ -1675,7 +1676,7 @@ public class QuorumControllerTest {
             new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new),
             new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY, 
ClientQuotasDelta::new),
             new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new),
-            new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, 
ConfigurationsDelta::new),
+            new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, image -> new 
ConfigurationsDelta(image, SupportedConfigChecker.TRUE)),
             new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY, 
DelegationTokenDelta::new),
             new ImageDeltaPair<>(() -> FeaturesImage.EMPTY, 
FeaturesDelta::new),
             new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY, 
ProducerIdsDelta::new),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
index ffde7b86be9..6a743b026eb 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
@@ -144,7 +144,9 @@ public class ControllerMetadataMetricsPublisherTest {
     @Test
     public void testLoadSnapshot() {
         try (TestEnv env = new TestEnv()) {
-            MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta.Builder()
+                .setImage(MetadataImage.EMPTY)
+                .build();
             ImageReWriter writer = new ImageReWriter(delta);
             IMAGE1.write(writer, new 
ImageWriterOptions.Builder(MetadataVersion.MINIMUM_VERSION).build());
             env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 6300e1293e8..6440baa4e1b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import org.junit.jupiter.api.Test;
@@ -31,10 +32,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -80,7 +84,7 @@ public class ConfigurationsImageTest {
             setResourceName("2").setName("foo").setValue("bar"),
             CONFIG_RECORD.highestSupportedVersion()));
 
-        DELTA1 = new ConfigurationsDelta(IMAGE1);
+        DELTA1 = new ConfigurationsDelta(IMAGE1, SupportedConfigChecker.TRUE);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
         Map<ConfigResource, ConfigurationImage> map2 = new HashMap<>();
@@ -116,6 +120,43 @@ public class ConfigurationsImageTest {
         testToImage(IMAGE2);
     }
 
+    @Test
+    public void testConfigurationDeltaFiltering() {
+        Set<String> validConfigs = Set.of("foo", "bar");
+        SupportedConfigChecker supportedConfigChecker = (resourceType, 
configName) -> validConfigs.contains(configName);
+
+        Map<String, String> initialConfigs = Map.of("foo", "value1"); // valid
+        ConfigurationImage image = new ConfigurationImage(new 
ConfigResource(BROKER, "0"), initialConfigs);
+
+        ConfigurationDelta delta = new ConfigurationDelta(image, 
supportedConfigChecker);
+        delta.replay(new 
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+            .setName("bar").setValue("value2"));
+        delta.replay(new 
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+            .setName("qux").setValue("value3"));
+
+        ConfigurationImage result = delta.apply();
+
+        assertTrue(result.data().containsKey("foo"));
+        assertTrue(result.data().containsKey("bar"));
+        assertFalse(result.data().containsKey("qux"));
+    }
+
+    @Test
+    public void testConfigurationDeltaWithoutFiltering() {
+        Map<String, String> initialConfigs = Map.of("foo", "value1", "bar", 
"value2");
+        ConfigurationImage image = new ConfigurationImage(new 
ConfigResource(BROKER, "0"), initialConfigs);
+
+        ConfigurationDelta delta = new ConfigurationDelta(image, 
SupportedConfigChecker.TRUE);
+        delta.replay(new 
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+            .setName("baz").setValue("value3"));
+
+        ConfigurationImage result = delta.apply();
+
+        assertTrue(result.data().containsKey("foo"));
+        assertTrue(result.data().containsKey("bar"));
+        assertTrue(result.data().containsKey("baz"));
+    }
+
     private static void testToImage(ConfigurationsImage image) {
         testToImage(image, Optional.empty());
     }
@@ -128,7 +169,7 @@ public class ConfigurationsImageTest {
         // test from empty image stopping each of the various intermediate 
images along the way
         new 
RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
             () -> ConfigurationsImage.EMPTY,
-            ConfigurationsDelta::new
+            img -> new ConfigurationsDelta(img, SupportedConfigChecker.TRUE)
         ).test(image, fromRecords);
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index d4a091da809..5ee4ad79ec0 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -140,7 +140,7 @@ public class MetadataImageTest {
         // test from empty image stopping each of the various intermediate 
images along the way
         new 
RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
             () -> MetadataImage.EMPTY,
-            MetadataDelta::new
+            img -> new MetadataDelta.Builder().setImage(img).build()
         ) {
             @Override
             public MetadataImage createImageByApplyingDelta(MetadataDelta 
delta) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
index 3df1844ca9c..ebe4365b27f 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
@@ -18,8 +18,10 @@
 package org.apache.kafka.image.loader;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.EndTransactionRecord;
 import org.apache.kafka.common.metadata.NoOpRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -28,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -40,6 +43,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalInt;
+import java.util.Properties;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -148,7 +152,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             new MockFaultHandler("testAlignedTransactionBatches"),
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -184,7 +189,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             new MockFaultHandler("testSingletonBeginAndEnd"),
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         // All in one commit
@@ -233,7 +239,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             faultHandler,
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         Batch<ApiMessageAndVersion> batch1 = Batch.data(
@@ -263,7 +270,8 @@ public class MetadataBatchLoaderTest {
                 new LogContext(),
                 new MockTime(),
                 faultHandler,
-                updater
+                updater,
+                SupportedConfigChecker.TRUE
         );
 
         // First batch gets loaded fine
@@ -296,7 +304,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             faultHandler,
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         // First batch gets loaded fine
@@ -333,7 +342,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             faultHandler,
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -417,7 +427,8 @@ public class MetadataBatchLoaderTest {
             new LogContext(),
             new MockTime(),
             new MockFaultHandler("testOneTransactionInMultipleBatches"),
-            updater
+            updater,
+            SupportedConfigChecker.TRUE
         );
 
         batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -449,6 +460,48 @@ public class MetadataBatchLoaderTest {
         }
     }
 
+    @Test
+    public void testUnsupportedConfigFilteredInBatch() {
+        SupportedConfigChecker checker = (type, name) ->
+            !(type == ConfigResource.Type.TOPIC && 
name.equals("unsupported.config"));
+
+        MockMetadataUpdater updater = new MockMetadataUpdater();
+        MetadataBatchLoader batchLoader = new MetadataBatchLoader(
+            new LogContext(),
+            new MockTime(),
+            new MockFaultHandler("testUnsupportedConfigFilteredInBatch"),
+            updater,
+            checker
+        );
+
+        List<ApiMessageAndVersion> records = List.of(
+            new ApiMessageAndVersion(new TopicRecord()
+                .setName("foo")
+                .setTopicId(TOPIC_FOO), (short) 0),
+            new ApiMessageAndVersion(new ConfigRecord()
+                .setResourceType(ConfigResource.Type.TOPIC.id())
+                .setResourceName("foo")
+                .setName("unsupported.config")
+                .setValue("some-value"), (short) 0),
+            new ApiMessageAndVersion(new ConfigRecord()
+                .setResourceType(ConfigResource.Type.TOPIC.id())
+                .setResourceName("foo")
+                .setName("retention.ms")
+                .setValue("1000"), (short) 0)
+        );
+
+        Batch<ApiMessageAndVersion> batch = Batch.data(10, 42, 0, 100, 
records);
+        batchLoader.resetToImage(MetadataImage.EMPTY);
+        batchLoader.loadBatch(batch, LEADER_AND_EPOCH);
+        batchLoader.maybeFlushBatches(LEADER_AND_EPOCH, true);
+
+        assertEquals(1, updater.updates);
+        ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, "foo");
+        Properties props = 
updater.latestImage.configs().configProperties(resource);
+        assertFalse(props.containsKey("unsupported.config"));
+        assertEquals("1000", props.getProperty("retention.ms"));
+    }
+
     @Test
     public void testTransactionAlignmentOnBatchBoundary() {
         List<ApiMessageAndVersion> batchRecords = new ArrayList<>();
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index b6069c5de75..acbc47cac00 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,10 +18,12 @@
 package org.apache.kafka.image.loader;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.EndTransactionRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -32,6 +34,7 @@ import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.ControlRecord;
@@ -924,4 +927,76 @@ public class MetadataLoaderTest {
         }
         faultHandler.maybeRethrowFirstException();
     }
+
+    @Test
+    public void testUnsupportedConfigFilteredInCommit() throws Exception {
+        // Create a checker that rejects "message.format.version"
+        SupportedConfigChecker checker = (type, name) ->
+            !name.equals("message.format.version");
+
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testUnsupportedConfigFilteredInCommit");
+        MockPublisher publisher = new 
MockPublisher("testUnsupportedConfigFilteredInCommit");
+
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
+                setSupportedConfigChecker(checker).
+                build()) {
+            loader.installPublishers(List.of(publisher)).get();
+            loadTestSnapshot(loader, 100);
+            publisher.firstPublish.get(10, TimeUnit.SECONDS);
+
+            // Commit a batch containing ConfigRecord with 
message.format.version
+            loader.handleCommit(MockBatchReader.newSingleBatchReader(200, 100, 
List.of(
+                new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(ConfigResource.Type.TOPIC.id()).
+                    setResourceName("test-topic").
+                    setName("message.format.version").
+                    setValue("2.8"), (short) 0)
+            )));
+            loader.waitForAllEventsToBeHandled();
+
+            // Verify config was filtered out
+            assertTrue(publisher.latestImage.configs().configMapForResource(
+                new ConfigResource(ConfigResource.Type.TOPIC, 
"test-topic")).isEmpty());
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    @Test
+    public void testUnsupportedConfigFilteredInSnapshot() throws Exception {
+        // Create a checker that rejects "message.format.version"
+        SupportedConfigChecker checker = (type, name) ->
+            !name.equals("message.format.version");
+
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testUnsupportedConfigFilteredInSnapshot");
+        MockPublisher publisher = new 
MockPublisher("testUnsupportedConfigFilteredInSnapshot");
+
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setSupportedConfigChecker(checker).
+                build()) {
+            loader.installPublishers(List.of(publisher)).get();
+
+            // Load a snapshot containing ConfigRecord with 
message.format.version
+            loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
+                new MetadataProvenance(200, 100, 4000, true), List.of(
+                    List.of(new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(MINIMUM_VERSION.featureLevel()), 
(short) 0)),
+                    List.of(new ApiMessageAndVersion(new ConfigRecord().
+                        setResourceType(ConfigResource.Type.TOPIC.id()).
+                        setResourceName("test-topic").
+                        setName("message.format.version").
+                        setValue("2.8"), (short) 0))
+                )));
+            loader.waitForAllEventsToBeHandled();
+
+            // Verify config was filtered out
+            assertTrue(publisher.latestImage.configs().configMapForResource(
+                new ConfigResource(ConfigResource.Type.TOPIC, 
"test-topic")).isEmpty());
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
 
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
new file mode 100644
index 00000000000..301d9e1d6bf
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.metadata.SupportedConfigChecker;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Default implementation of SupportedConfigChecker that checks if a 
configuration name
+ * is supported for a given resource type based on the actual config 
definitions.
+ *
+ * This class maintains a predicate per resource type:
+ * - TOPIC: Configurations defined in LogConfig
+ * - BROKER: All config names are accepted. Broker configs include 
listener-specific
+ *   prefixed configs (e.g., listener.name.&lt;name&gt;.ssl.keystore.location) 
whose names
+ *   are user-defined at runtime and cannot be pre-enumerated. They also 
include
+ *   plugin-defined configs (e.g., custom authorizer or quota callback 
configs) with
+ *   arbitrary names. For these reasons BROKER configs are not filtered by 
name.
+ * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs
+ * - GROUP: Configurations defined in GroupConfig
+ *
+ * Config names for resource types not in this map are considered unsupported.
+ */
+public final class DefaultSupportedConfigChecker implements 
SupportedConfigChecker {
+    static final class SetContainsPredicate implements Predicate<String> {
+        private final Set<String> keys;
+
+        SetContainsPredicate(Set<String> keys) {
+            this.keys = keys;
+        }
+
+        @Override
+        public boolean test(String key) {
+            return keys.contains(key);
+        }
+    }
+
+    private final Map<ConfigResource.Type, Predicate<String>> 
validConfigsByType;
+
+    public DefaultSupportedConfigChecker() {
+        this.validConfigsByType = Map.of(
+            ConfigResource.Type.TOPIC, new SetContainsPredicate(new 
HashSet<>(LogConfig.configNames())),
+            ConfigResource.Type.BROKER, ignore -> true,
+            ConfigResource.Type.CLIENT_METRICS, new 
SetContainsPredicate(ClientMetricsConfigs.configDef().names()),
+            ConfigResource.Type.GROUP, new 
SetContainsPredicate(GroupConfig.configDef().names())
+        );
+    }
+
+    @Override
+    public boolean isSupported(ConfigResource.Type resourceType, String 
configName) {
+        Predicate<String> predicate = validConfigsByType.get(resourceType);
+        return predicate != null && predicate.test(configName);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
 
b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
new file mode 100644
index 00000000000..b62626e42a6
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static 
org.apache.kafka.common.config.ConfigResource.Type.CLIENT_METRICS;
+import static org.apache.kafka.common.config.ConfigResource.Type.GROUP;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultSupportedConfigCheckerTest {
+    private final DefaultSupportedConfigChecker checker = new 
DefaultSupportedConfigChecker();
+
+    @Test
+    void testIsSupported() {
+        // Test valid topic configs
+        assertTrue(checker.isSupported(TOPIC, 
TopicConfig.SEGMENT_BYTES_CONFIG));
+        assertTrue(checker.isSupported(TOPIC, TopicConfig.SEGMENT_MS_CONFIG));
+        assertTrue(checker.isSupported(TOPIC, 
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG));
+        assertFalse(checker.isSupported(TOPIC, "invalid.topic.config"));
+
+        // BROKER allows all config names, including listener-specific 
prefixed configs
+        // (e.g., listener.name.<name>.ssl.keystore.location) and 
plugin-defined configs
+        // (e.g., custom authorizer or quota callback configs) that cannot be 
pre-enumerated.
+        assertTrue(checker.isSupported(BROKER, "log.cleaner.threads"));
+        assertTrue(checker.isSupported(BROKER, "num.network.threads"));
+        assertTrue(checker.isSupported(BROKER, "log.segment.bytes"));
+        assertTrue(checker.isSupported(BROKER, 
"listener.name.EXTERNAL.ssl.keystore.location"));
+        assertTrue(checker.isSupported(BROKER, 
"fake.configurable.authorizer.foobar.config"));
+
+        // Test valid client metrics configs
+        assertTrue(checker.isSupported(CLIENT_METRICS, 
ClientMetricsConfigs.INTERVAL_MS_CONFIG));
+        assertTrue(checker.isSupported(CLIENT_METRICS, 
ClientMetricsConfigs.METRICS_CONFIG));
+        assertTrue(checker.isSupported(CLIENT_METRICS, 
ClientMetricsConfigs.MATCH_CONFIG));
+        assertFalse(checker.isSupported(CLIENT_METRICS, 
"invalid.client.metrics.config"));
+
+        // Test valid group configs
+        assertTrue(checker.isSupported(GROUP, 
GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+        assertTrue(checker.isSupported(GROUP, 
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
+        assertFalse(checker.isSupported(GROUP, "invalid.group.config"));
+
+        // Test that topic replication throttled replicas are supported for 
TOPIC
+        assertTrue(checker.isSupported(TOPIC, 
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+        assertTrue(checker.isSupported(TOPIC, 
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+
+        // Test that broker replication throttled rates are supported for 
BROKER
+        assertTrue(checker.isSupported(BROKER, 
QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG));
+        assertTrue(checker.isSupported(BROKER, 
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG));
+        assertTrue(checker.isSupported(BROKER, 
QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG));
+    }
+}
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index cf5c51590a5..0e52472fb64 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -2125,7 +2125,9 @@ class ShareCoordinatorServiceTest {
             .build();
 
         // Create a delta that deletes the topic.
-        MetadataDelta delta = new MetadataDelta(initialImage);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(initialImage)
+            .build();
         delta.replay(new RemoveTopicRecord().setTopicId(topicId));
         MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L, 
true));
 
@@ -2172,7 +2174,9 @@ class ShareCoordinatorServiceTest {
         MetadataImage image = new MetadataImageBuilder()
             .addTopic(Uuid.randomUuid(), "foo", 1)
             .build();
-        MetadataDelta delta = new MetadataDelta(image);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(image)
+            .build();
 
         assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
 
@@ -2245,7 +2249,9 @@ class ShareCoordinatorServiceTest {
             .build();
 
         // Create a delta that deletes the topic.
-        MetadataDelta delta = new MetadataDelta(initialImage);
+        MetadataDelta delta = new MetadataDelta.Builder()
+            .setImage(initialImage)
+            .build();
         delta.replay(new RemoveTopicRecord().setTopicId(topicId));
         MetadataImage newImage = delta.apply(new MetadataProvenance(1, 0, 0L, 
true));
 
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java 
b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index 7af9557381d..0614d268f54 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -22,7 +22,9 @@ import kafka.tools.TerseFailure;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.image.loader.MetadataLoader;
+import org.apache.kafka.metadata.SupportedConfigChecker;
 import org.apache.kafka.metadata.util.SnapshotFileReader;
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.fault.LoggingFaultHandler;
 import org.apache.kafka.server.util.FileLock;
@@ -148,10 +150,13 @@ public final class MetadataShell {
 
     private void initializeWithSnapshotFileReader() throws Exception {
         this.fileLock = takeDirectoryLockIfExists(parentParent(new 
File(snapshotPath)));
+        SupportedConfigChecker supportedConfigChecker = new 
DefaultSupportedConfigChecker();
+
         this.loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
                 setNodeId(-1).
                 setHighWaterMarkAccessor(() -> 
snapshotFileReader.highWaterMark()).
+                setSupportedConfigChecker(supportedConfigChecker).
                 build();
         snapshotFileReader = new SnapshotFileReader(snapshotPath, loader);
         snapshotFileReader.startup();


Reply via email to