This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d8978ad013b MINOR: Replace Properties with Map in 
ControllerConfigurationValidator validation path (#21481)
d8978ad013b is described below

commit d8978ad013b65c5dd16a0a820e5a154285bdea60
Author: majialong <[email protected]>
AuthorDate: Tue Mar 17 23:03:30 2026 +0800

    MINOR: Replace Properties with Map in ControllerConfigurationValidator 
validation path (#21481)
    
    Refactor `ControllerConfigurationValidator` and downstream validation
    methods to use `Map` instead of `Properties`. Also adds explicit null
    value validation for `CLIENT_METRICS` configs, which previously relied
    on `Properties` throwing `NullPointerException` for null values.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../server/ControllerConfigurationValidator.scala  | 60 ++++++++++------------
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 22 ++++----
 .../ControllerConfigurationValidatorTest.scala     | 10 ++++
 .../kafka/coordinator/group/GroupConfig.java       | 13 ++---
 .../kafka/coordinator/group/GroupConfigTest.java   | 18 +++----
 .../apache/kafka/raft/internals/KafkaRaftLog.java  | 17 +++---
 .../kafka/server/metrics/ClientMetricsConfigs.java | 16 +++---
 .../kafka/storage/internals/log/LogConfig.java     | 34 ++++++------
 .../java/org/apache/kafka/tools/TopicCommand.java  | 15 +++---
 9 files changed, 104 insertions(+), 101 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 89fdb1d4242..7fdf7b00a98 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.util
-import java.util.Properties
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
CLIENT_METRICS, GROUP, TOPIC}
 import org.apache.kafka.controller.ConfigurationValidator
@@ -78,6 +77,28 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
     }
   }
 
+  private def filterAndValidateNullConfigs(
+    newConfigs: util.Map[String, String],
+    resourceTypeName: String
+  ): util.HashMap[String, String] = {
+    val filteredConfigs = new util.HashMap[String, String]()
+    val nullConfigs = new mutable.ArrayBuffer[String]()
+
+    newConfigs.forEach((key, value) => {
+      if (value == null) {
+        nullConfigs += key
+      } else {
+        filteredConfigs.put(key, value)
+      }
+    })
+    if (nullConfigs.nonEmpty) {
+      throw new InvalidConfigurationException(s"Null value not supported for 
$resourceTypeName configs: " +
+        nullConfigs.mkString(","))
+    }
+
+    filteredConfigs
+  }
+
   private def throwExceptionForUnknownResourceType(
     resource: ConfigResource
   ): Unit = {
@@ -104,42 +125,17 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
     resource.`type`() match {
       case TOPIC =>
         validateTopicName(resource.name())
-        val properties = new Properties()
-        val nullTopicConfigs = new mutable.ArrayBuffer[String]()
-        newConfigs.forEach((key, value) => {
-          if (value == null) {
-            nullTopicConfigs += key
-          } else {
-            properties.setProperty(key, value)
-          }
-        })
-        if (nullTopicConfigs.nonEmpty) {
-          throw new InvalidConfigurationException("Null value not supported 
for topic configs: " +
-            nullTopicConfigs.mkString(","))
-        }
-        LogConfig.validate(oldConfigs, properties, 
kafkaConfig.extractLogConfigMap,
+        val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "topic")
+        LogConfig.validate(oldConfigs, filteredConfigs, 
kafkaConfig.extractLogConfigMap,
           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
-        val properties = new Properties()
-        newConfigs.forEach((key, value) => properties.setProperty(key, value))
-        ClientMetricsConfigs.validate(resource.name(), properties)
+        val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "client 
metrics")
+        ClientMetricsConfigs.validate(resource.name(), filteredConfigs)
       case GROUP =>
         validateGroupName(resource.name())
-        val properties = new Properties()
-        val nullGroupConfigs = new mutable.ArrayBuffer[String]()
-        newConfigs.forEach((key, value) => {
-          if (value == null) {
-            nullGroupConfigs += key
-          } else {
-            properties.setProperty(key, value)
-          }
-        })
-        if (nullGroupConfigs.nonEmpty) {
-          throw new InvalidConfigurationException("Null value not supported 
for group configs: " +
-            nullGroupConfigs.mkString(","))
-        }
-        GroupConfig.validate(properties, kafkaConfig.groupCoordinatorConfig, 
kafkaConfig.shareGroupConfig)
+        val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "group")
+        GroupConfig.validate(filteredConfigs, 
kafkaConfig.groupCoordinatorConfig, kafkaConfig.shareGroupConfig)
       case _ => throwExceptionForUnknownResourceType(resource)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 98a49070913..ecef38cfbe7 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -81,9 +81,9 @@ class LogConfigTest {
 
   @Test
   def testInvalidCompactionLagConfig(): Unit = {
-    val props = new Properties
-    props.setProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "100")
-    props.setProperty(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "200")
+    val props = new util.HashMap[String, String]
+    props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "100")
+    props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "200")
     assertThrows(classOf[Exception], () => LogConfig.validate(props))
   }
 
@@ -265,7 +265,7 @@ class LogConfigTest {
     
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
 
-    val props = new Properties()
+    val props = new util.HashMap[String, String]()
     props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     props.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes.toString)
     props.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString)
@@ -281,7 +281,7 @@ class LogConfigTest {
     val kafkaProps = TestUtils.createDummyBrokerConfig()
     
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
-    val logProps = new Properties()
+    val logProps = new util.HashMap[String, String]()
     def validateCleanupPolicy(): Unit = {
       LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
@@ -307,7 +307,7 @@ class LogConfigTest {
     
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
sysRemoteStorageEnabled.toString)
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
 
-    val logProps = new Properties()
+    val logProps = new util.HashMap[String, String]()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
       LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
@@ -325,7 +325,7 @@ class LogConfigTest {
     
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
 
-    val logProps = new Properties()
+    val logProps = new util.HashMap[String, String]()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
     if (wasRemoteStorageEnabled) {
       val message = assertThrows(classOf[InvalidConfigurationException],
@@ -357,7 +357,7 @@ class LogConfigTest {
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
 
     // Topic local log retention time inherited from Broker is greater than 
the topic's complete log retention time
-    val logProps = new Properties()
+    val logProps = new util.HashMap[String, String]()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
sysRemoteStorageEnabled.toString)
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
@@ -381,7 +381,7 @@ class LogConfigTest {
     val kafkaConfig = KafkaConfig.fromProps(props)
 
     // Topic local retention size inherited from Broker is greater than the 
topic's complete log retention size
-    val logProps = new Properties()
+    val logProps = new util.HashMap[String, String]()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
sysRemoteStorageEnabled.toString)
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
@@ -416,7 +416,7 @@ class LogConfigTest {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
-    val logProps = new Properties
+    val logProps = new util.HashMap[String, String]
     logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
copyDisabled.toString)
     LogConfig.validate(logProps)
   }
@@ -424,7 +424,7 @@ class LogConfigTest {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
-    val logProps = new Properties
+    val logProps = new util.HashMap[String, String]
     logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
deleteOnDisable.toString)
     LogConfig.validate(logProps)
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 3056753f53b..9e99f01c0e1 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -202,6 +202,16 @@ class ControllerConfigurationValidatorTest {
         new ConfigResource(GROUP, "group"), config, emptyMap())).getMessage)
   }
 
+  @Test
+  def testNullClientMetricsConfigValue(): Unit = {
+    val config = new util.TreeMap[String, String]()
+    config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000")
+    config.put(ClientMetricsConfigs.MATCH_CONFIG, null)
+    assertEquals("Null value not supported for client metrics configs: match",
+      assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
+        new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())).getMessage)
+  }
+
   @Test
   def testInvalidGroupConfig(): Unit = {
     val config = new util.TreeMap[String, String]()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index e0b52574fc3..db2cc128284 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -237,9 +238,9 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * Check that property names are valid
      */
-    public static void validateNames(Properties props) {
+    public static void validateNames(Map<String, ?> props) {
         Set<String> names = configNames();
-        for (String name : props.stringPropertyNames()) {
+        for (String name : props.keySet()) {
             if (!names.contains(name)) {
                 throw new InvalidConfigurationException("Unknown group config 
name: " + name);
             }
@@ -250,7 +251,7 @@ public final class GroupConfig extends AbstractConfig {
      * Validates the values of the given properties.
      */
     @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
-    private static void validateValues(Map<?, ?> valueMaps, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
+    private static void validateValues(Map<String, ?> valueMaps, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
         int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
         int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
         int shareHeartbeatInterval = (Integer) 
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -356,13 +357,13 @@ public final class GroupConfig extends AbstractConfig {
      * all values can be parsed and are valid. The provided properties are 
merged with
      * the broker-level defaults before validation.
      */
-    public static void validate(Properties props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
-        Properties combinedConfigs = new Properties();
+    public static void validate(Map<String, ?> props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
+        Map<String, Object> combinedConfigs = new HashMap<>();
         
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
         combinedConfigs.putAll(props);
 
         validateNames(combinedConfigs);
-        Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
+        Map<String, Object> valueMaps = CONFIG.parse(combinedConfigs);
         validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index bd7bec0e98f..9b28b591574 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -120,7 +120,7 @@ public class GroupConfigTest {
     @Test
     public void testValidShareAutoOffsetResetValues() {
 
-        Properties props = createValidGroupConfig();
+        Map<String, String> props = createValidGroupConfig();
 
         // Check for value "latest"
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
@@ -139,7 +139,7 @@ public class GroupConfigTest {
     @Test
     public void testValidShareIsolationLevelValues() {
         // Check for value READ_UNCOMMITTED
-        Properties props = createValidGroupConfig();
+        Map<String, String> props = createValidGroupConfig();
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
         doTestValidProps(props);
 
@@ -152,7 +152,7 @@ public class GroupConfigTest {
     @Test
     public void testInvalidProps() {
 
-        Properties props = createValidGroupConfig();
+        Map<String, String> props = createValidGroupConfig();
 
         // Check for invalid consumerSessionTimeoutMs, < MIN
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
@@ -276,11 +276,11 @@ public class GroupConfigTest {
         doTestInvalidProps(props, ConfigException.class);
     }
 
-    private void doTestInvalidProps(Properties props, Class<? extends 
Exception> exceptionClassName) {
+    private void doTestInvalidProps(Map<String, String> props, Class<? extends 
Exception> exceptionClassName) {
         assertThrows(exceptionClassName, () -> GroupConfig.validate(props, 
createGroupCoordinatorConfig(), createShareGroupConfig()));
     }
 
-    private void doTestValidProps(Properties props) {
+    private void doTestValidProps(Map<String, String> props) {
         assertDoesNotThrow(() -> GroupConfig.validate(props, 
createGroupCoordinatorConfig(), createShareGroupConfig()));
     }
 
@@ -324,7 +324,7 @@ public class GroupConfigTest {
 
     @Test
     public void testInvalidConfigName() {
-        Properties props = new Properties();
+        Map<String, String> props = new HashMap<>();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
         props.put("invalid.config.name", "10");
         assertThrows(InvalidConfigurationException.class, () -> 
GroupConfig.validate(props, createGroupCoordinatorConfig(), 
createShareGroupConfig()));
@@ -347,7 +347,7 @@ public class GroupConfigTest {
         ShareGroupConfig shareGroupConfig = 
ShareGroupConfig.fromProps(overrides);
 
         assertDoesNotThrow(() ->
-            GroupConfig.validate(new Properties(), groupCoordinatorConfig, 
shareGroupConfig));
+            GroupConfig.validate(new HashMap<>(), groupCoordinatorConfig, 
shareGroupConfig));
     }
 
     @Test
@@ -487,8 +487,8 @@ public class GroupConfigTest {
         assertEquals(expectedMax, result.get(key));
     }
 
-    private Properties createValidGroupConfig() {
-        Properties props = new Properties();
+    private Map<String, String> createValidGroupConfig() {
+        Map<String, String> props = new HashMap<>();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
         props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
index 5b0ca350542..b168dc66c46 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
@@ -72,7 +72,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
@@ -674,19 +673,19 @@ public class KafkaRaftLog implements RaftLog {
             Scheduler scheduler,
             MetadataLogConfig config,
             int nodeId) throws IOException {
-        Properties props = new Properties();
-        props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(config.internalMaxBatchSizeInBytes()));
+        Map<String, String> props = new HashMap<>();
+        props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(config.internalMaxBatchSizeInBytes()));
         if (config.internalSegmentBytes() != null) {
-            props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 
String.valueOf(config.internalSegmentBytes()));
+            props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 
String.valueOf(config.internalSegmentBytes()));
         } else {
-            props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, 
String.valueOf(config.logSegmentBytes()));
+            props.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
String.valueOf(config.logSegmentBytes()));
         }
-        props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, 
String.valueOf(config.logSegmentMillis()));
-        props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 
String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT));
+        props.put(TopicConfig.SEGMENT_MS_CONFIG, 
String.valueOf(config.logSegmentMillis()));
+        props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 
String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT));
 
         // Disable time and byte retention when deleting segments
-        props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1");
-        props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1");
+        props.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
+        props.put(TopicConfig.RETENTION_BYTES_CONFIG, "-1");
         LogConfig.validate(props);
         LogConfig defaultLogConfig = new LogConfig(props);
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index b7a66df7b16..90a36d5d3e7 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -140,27 +140,27 @@ public class ClientMetricsConfigs extends AbstractConfig {
         return CONFIG.names();
     }
 
-    public static void validate(String subscriptionName, Properties 
properties) {
+    public static void validate(String subscriptionName, Map<?, ?> props) {
         if (subscriptionName == null || subscriptionName.isEmpty()) {
             throw new InvalidRequestException("Subscription name can't be 
empty");
         }
 
-        validateProperties(properties);
+        validateConfigs(props);
     }
 
     @SuppressWarnings("unchecked")
-    private static void validateProperties(Properties properties) {
-        // Make sure that all the properties are valid
-        properties.forEach((key, value) -> {
+    private static void validateConfigs(Map<?, ?> configs) {
+        // Make sure that all the configs are valid
+        configs.forEach((key, value) -> {
             if (!names().contains(key)) {
                 throw new InvalidRequestException("Unknown client metrics 
configuration: " + key);
             }
         });
 
-        Map<String, Object> parsed = CONFIG.parse(properties);
+        Map<String, Object> parsed = CONFIG.parse(configs);
 
         // Make sure that push interval is between 100ms and 1 hour.
-        if (properties.containsKey(INTERVAL_MS_CONFIG)) {
+        if (configs.containsKey(INTERVAL_MS_CONFIG)) {
             int pushIntervalMs = (Integer) parsed.get(INTERVAL_MS_CONFIG);
             if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > 
MAX_INTERVAL_MS) {
                 String msg = String.format("Invalid value %s for %s, interval 
must be between 100 and 3600000 (1 hour)",
@@ -170,7 +170,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
         }
 
         // Make sure that client match patterns are valid by parsing them.
-        if (properties.containsKey(MATCH_CONFIG)) {
+        if (configs.containsKey(MATCH_CONFIG)) {
             List<String> patterns = (List<String>) parsed.get(MATCH_CONFIG);
             // Parse the client matching patterns to validate if the patterns 
are valid.
             parseMatchingPatterns(patterns);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f81d224e7ea..4276ac71b3d 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -455,9 +455,9 @@ public class LogConfig extends AbstractConfig {
     /**
      * Check that property names are valid
      */
-    public static void validateNames(Properties props) {
+    public static void validateNames(Map<String, String> props) {
         List<String> names = configNames();
-        for (Object name : props.keySet())
+        for (String name : props.keySet())
             if (!names.contains(name))
                 throw new InvalidConfigurationException("Unknown topic config 
name: " + name);
     }
@@ -468,7 +468,7 @@ public class LogConfig extends AbstractConfig {
      * LogConfig class.
      * @param props The properties to be validated
      */
-    public static void validateValues(Map<?, ?> props) {
+    public static void validateValues(Map<String, ?> props) {
         long minCompactionLag = (Long) 
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
         long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
         if (minCompactionLag > maxCompactionLag) {
@@ -484,7 +484,7 @@ public class LogConfig extends AbstractConfig {
      * The default values should be extracted from the KafkaConfig.
      * @param props The properties to be validated
      */
-    public static void validateBrokerLogConfigValues(Map<?, ?> props,
+    public static void validateBrokerLogConfigValues(Map<String, ?> props,
                                                      boolean 
isRemoteLogStorageSystemEnabled) {
         validateValues(props);
         if (isRemoteLogStorageSystemEnabled) {
@@ -502,7 +502,7 @@ public class LogConfig extends AbstractConfig {
      * @param isRemoteLogStorageSystemEnabled   true if system wise remote log 
storage is enabled
      */
     private static void validateTopicLogConfigValues(Map<String, String> 
existingConfigs,
-                                                     Map<?, ?> newConfigs,
+                                                     Map<String, ?> newConfigs,
                                                      boolean 
isRemoteLogStorageSystemEnabled) {
         validateValues(newConfigs);
 
@@ -520,7 +520,7 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> 
newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
+    public static void validateTurningOffRemoteStorageWithDelete(Map<String, 
?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
         boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
         if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && 
!isRemoteLogDeleteOnDisable) {
             throw new InvalidConfigurationException("It is invalid to disable 
remote storage without deleting remote data. " +
@@ -529,7 +529,7 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, 
?> newConfigs, boolean isRemoteLogStorageEnabled) {
+    public static void 
validateRetentionConfigsWhenRemoteCopyDisabled(Map<String, ?> newConfigs, 
boolean isRemoteLogStorageEnabled) {
         boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
 false);
         long retentionMs = (Long) 
newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
         long localRetentionMs = (Long) 
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
@@ -547,7 +547,7 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> 
props, boolean isRemoteLogStorageSystemEnabled, boolean 
isReceivingConfigFromStore) {
+    public static void validateRemoteStorageOnlyIfSystemEnabled(Map<String, ?> 
props, boolean isRemoteLogStorageSystemEnabled, boolean 
isReceivingConfigFromStore) {
         boolean isRemoteLogStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
             if (isReceivingConfigFromStore) {
@@ -560,14 +560,14 @@ public class LogConfig extends AbstractConfig {
     }
 
     @SuppressWarnings("unchecked")
-    private static void 
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
+    private static void 
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<String, ?> props) {
         List<String> cleanupPolicy = (List<String>) 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
         if (!cleanupPolicy.isEmpty() && (cleanupPolicy.size() != 1 || 
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0)))) {
             throw new ConfigException("Remote log storage only supports topics 
with cleanup.policy=delete or cleanup.policy being an empty list.");
         }
     }
 
-    private static void validateRemoteStorageRetentionSize(Map<?, ?> props) {
+    private static void validateRemoteStorageRetentionSize(Map<String, ?> 
props) {
         Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
         Long localRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
         if (retentionBytes > -1 && localRetentionBytes != -2) {
@@ -584,7 +584,7 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
+    private static void validateRemoteStorageRetentionTime(Map<String, ?> 
props) {
         Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG);
         Long localRetentionMs = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
         if (retentionMs != -1 && localRetentionMs != -2) {
@@ -604,22 +604,22 @@ public class LogConfig extends AbstractConfig {
     /**
      * Check that the given properties contain only valid log config names and 
that all values can be parsed and are valid
      */
-    public static void validate(Properties props) {
+    public static void validate(Map<String, String> props) {
         validate(Map.of(), props, Map.of(), false);
     }
 
     public static void validate(Map<String, String> existingConfigs,
-                                Properties props,
-                                Map<?, ?> configuredProps,
+                                Map<String, String> props,
+                                Map<String, ?> configuredProps,
                                 boolean isRemoteLogStorageSystemEnabled) {
         validateNames(props);
         if (configuredProps == null || configuredProps.isEmpty()) {
-            Map<?, ?> valueMaps = CONFIG.parse(props);
+            Map<String, ?> valueMaps = CONFIG.parse(props);
             validateValues(valueMaps);
         } else {
-            Map<Object, Object> combinedConfigs = new 
HashMap<>(configuredProps);
+            Map<String, Object> combinedConfigs = new 
HashMap<>(configuredProps);
             combinedConfigs.putAll(props);
-            Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
+            Map<String, ?> valueMaps = CONFIG.parse(combinedConfigs);
             validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled);
         }
     }
diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
index 1d1d5c2f72c..691625b377e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -154,7 +155,7 @@ public abstract class TopicCommand {
         return ret;
     }
 
-    private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions 
opts) {
+    private static Map<String, String> 
parseTopicConfigsToBeAdded(TopicCommandOptions opts) {
         List<List<String>> configsToBeAdded = 
opts.topicConfig().orElse(List.of())
             .stream()
             .map(s -> List.of(s.split("\\s*=\\s*")))
@@ -164,9 +165,8 @@ public abstract class TopicCommand {
             throw new IllegalArgumentException("requirement failed: Invalid 
topic config: all configs to be added must be in the format \"key=val\".");
         }
 
-        Properties props = new Properties();
-        configsToBeAdded.stream()
-            .forEach(pair -> props.setProperty(pair.get(0).trim(), 
pair.get(1).trim()));
+        Map<String, String> props = new HashMap<>();
+        configsToBeAdded.forEach(pair -> props.put(pair.get(0).trim(), 
pair.get(1).trim()));
         LogConfig.validate(props);
         return props;
     }
@@ -243,7 +243,7 @@ public abstract class TopicCommand {
         private final Optional<Integer> partitions;
         private final Optional<Integer> replicationFactor;
         private final Map<Integer, List<Integer>> replicaAssignment;
-        private final Properties configsToAdd;
+        private final Map<String, String> configsToAdd;
 
         private final TopicCommandOptions opts;
 
@@ -455,10 +455,7 @@ public abstract class TopicCommand {
                     ? new NewTopic(topic.name, topic.replicaAssignment)
                     : new NewTopic(topic.name, topic.partitions, 
topic.replicationFactor.map(Integer::shortValue));
 
-                Map<String, String> configsMap = 
topic.configsToAdd.stringPropertyNames().stream()
-                    .collect(Collectors.toMap(name -> name, 
topic.configsToAdd::getProperty));
-
-                newTopic.configs(configsMap);
+                newTopic.configs(topic.configsToAdd);
                 CreateTopicsResult createResult = 
adminClient.createTopics(Set.of(newTopic),
                     new CreateTopicsOptions().retryOnQuotaViolation(false));
                 createResult.all().get();

Reply via email to