This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d8978ad013b MINOR: Replace Properties with Map in
ControllerConfigurationValidator validation path (#21481)
d8978ad013b is described below
commit d8978ad013b65c5dd16a0a820e5a154285bdea60
Author: majialong <[email protected]>
AuthorDate: Tue Mar 17 23:03:30 2026 +0800
MINOR: Replace Properties with Map in ControllerConfigurationValidator
validation path (#21481)
Refactor `ControllerConfigurationValidator` and downstream validation
methods to use `Map` instead of `Properties`. Also adds explicit null
value validation for `CLIENT_METRICS` configs, which previously relied
on `Properties` throwing `NullPointerException` for null values.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../server/ControllerConfigurationValidator.scala | 60 ++++++++++------------
.../test/scala/unit/kafka/log/LogConfigTest.scala | 22 ++++----
.../ControllerConfigurationValidatorTest.scala | 10 ++++
.../kafka/coordinator/group/GroupConfig.java | 13 ++---
.../kafka/coordinator/group/GroupConfigTest.java | 18 +++----
.../apache/kafka/raft/internals/KafkaRaftLog.java | 17 +++---
.../kafka/server/metrics/ClientMetricsConfigs.java | 16 +++---
.../kafka/storage/internals/log/LogConfig.java | 34 ++++++------
.../java/org/apache/kafka/tools/TopicCommand.java | 15 +++---
9 files changed, 104 insertions(+), 101 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 89fdb1d4242..7fdf7b00a98 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -18,7 +18,6 @@
package kafka.server
import java.util
-import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.controller.ConfigurationValidator
@@ -78,6 +77,28 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
}
}
+ private def filterAndValidateNullConfigs(
+ newConfigs: util.Map[String, String],
+ resourceTypeName: String
+ ): util.HashMap[String, String] = {
+ val filteredConfigs = new util.HashMap[String, String]()
+ val nullConfigs = new mutable.ArrayBuffer[String]()
+
+ newConfigs.forEach((key, value) => {
+ if (value == null) {
+ nullConfigs += key
+ } else {
+ filteredConfigs.put(key, value)
+ }
+ })
+ if (nullConfigs.nonEmpty) {
+ throw new InvalidConfigurationException(s"Null value not supported for
$resourceTypeName configs: " +
+ nullConfigs.mkString(","))
+ }
+
+ filteredConfigs
+ }
+
private def throwExceptionForUnknownResourceType(
resource: ConfigResource
): Unit = {
@@ -104,42 +125,17 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
resource.`type`() match {
case TOPIC =>
validateTopicName(resource.name())
- val properties = new Properties()
- val nullTopicConfigs = new mutable.ArrayBuffer[String]()
- newConfigs.forEach((key, value) => {
- if (value == null) {
- nullTopicConfigs += key
- } else {
- properties.setProperty(key, value)
- }
- })
- if (nullTopicConfigs.nonEmpty) {
- throw new InvalidConfigurationException("Null value not supported
for topic configs: " +
- nullTopicConfigs.mkString(","))
- }
- LogConfig.validate(oldConfigs, properties,
kafkaConfig.extractLogConfigMap,
+ val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "topic")
+ LogConfig.validate(oldConfigs, filteredConfigs,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
- val properties = new Properties()
- newConfigs.forEach((key, value) => properties.setProperty(key, value))
- ClientMetricsConfigs.validate(resource.name(), properties)
+ val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "client
metrics")
+ ClientMetricsConfigs.validate(resource.name(), filteredConfigs)
case GROUP =>
validateGroupName(resource.name())
- val properties = new Properties()
- val nullGroupConfigs = new mutable.ArrayBuffer[String]()
- newConfigs.forEach((key, value) => {
- if (value == null) {
- nullGroupConfigs += key
- } else {
- properties.setProperty(key, value)
- }
- })
- if (nullGroupConfigs.nonEmpty) {
- throw new InvalidConfigurationException("Null value not supported
for group configs: " +
- nullGroupConfigs.mkString(","))
- }
- GroupConfig.validate(properties, kafkaConfig.groupCoordinatorConfig,
kafkaConfig.shareGroupConfig)
+ val filteredConfigs = filterAndValidateNullConfigs(newConfigs, "group")
+ GroupConfig.validate(filteredConfigs,
kafkaConfig.groupCoordinatorConfig, kafkaConfig.shareGroupConfig)
case _ => throwExceptionForUnknownResourceType(resource)
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 98a49070913..ecef38cfbe7 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -81,9 +81,9 @@ class LogConfigTest {
@Test
def testInvalidCompactionLagConfig(): Unit = {
- val props = new Properties
- props.setProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "100")
- props.setProperty(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "200")
+ val props = new util.HashMap[String, String]
+ props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "100")
+ props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "200")
assertThrows(classOf[Exception], () => LogConfig.validate(props))
}
@@ -265,7 +265,7 @@ class LogConfigTest {
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
- val props = new Properties()
+ val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes.toString)
props.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString)
@@ -281,7 +281,7 @@ class LogConfigTest {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
- val logProps = new Properties()
+ val logProps = new util.HashMap[String, String]()
def validateCleanupPolicy(): Unit = {
LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
@@ -307,7 +307,7 @@ class LogConfigTest {
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
sysRemoteStorageEnabled.toString)
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
- val logProps = new Properties()
+ val logProps = new util.HashMap[String, String]()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
@@ -325,7 +325,7 @@ class LogConfigTest {
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
- val logProps = new Properties()
+ val logProps = new util.HashMap[String, String]()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
@@ -357,7 +357,7 @@ class LogConfigTest {
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
// Topic local log retention time inherited from Broker is greater than
the topic's complete log retention time
- val logProps = new Properties()
+ val logProps = new util.HashMap[String, String]()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
sysRemoteStorageEnabled.toString)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
@@ -381,7 +381,7 @@ class LogConfigTest {
val kafkaConfig = KafkaConfig.fromProps(props)
// Topic local retention size inherited from Broker is greater than the
topic's complete log retention size
- val logProps = new Properties()
+ val logProps = new util.HashMap[String, String]()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
sysRemoteStorageEnabled.toString)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
@@ -416,7 +416,7 @@ class LogConfigTest {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
- val logProps = new Properties
+ val logProps = new util.HashMap[String, String]
logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
copyDisabled.toString)
LogConfig.validate(logProps)
}
@@ -424,7 +424,7 @@ class LogConfigTest {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
- val logProps = new Properties
+ val logProps = new util.HashMap[String, String]
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
deleteOnDisable.toString)
LogConfig.validate(logProps)
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 3056753f53b..9e99f01c0e1 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -202,6 +202,16 @@ class ControllerConfigurationValidatorTest {
new ConfigResource(GROUP, "group"), config, emptyMap())).getMessage)
}
+ @Test
+ def testNullClientMetricsConfigValue(): Unit = {
+ val config = new util.TreeMap[String, String]()
+ config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000")
+ config.put(ClientMetricsConfigs.MATCH_CONFIG, null)
+ assertEquals("Null value not supported for client metrics configs: match",
+ assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
+ new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())).getMessage)
+ }
+
@Test
def testInvalidGroupConfig(): Unit = {
val config = new util.TreeMap[String, String]()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index e0b52574fc3..db2cc128284 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -237,9 +238,9 @@ public final class GroupConfig extends AbstractConfig {
/**
* Check that property names are valid
*/
- public static void validateNames(Properties props) {
+ public static void validateNames(Map<String, ?> props) {
Set<String> names = configNames();
- for (String name : props.stringPropertyNames()) {
+ for (String name : props.keySet()) {
if (!names.contains(name)) {
throw new InvalidConfigurationException("Unknown group config
name: " + name);
}
@@ -250,7 +251,7 @@ public final class GroupConfig extends AbstractConfig {
* Validates the values of the given properties.
*/
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
- private static void validateValues(Map<?, ?> valueMaps,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
+ private static void validateValues(Map<String, ?> valueMaps,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
int consumerHeartbeatInterval = (Integer)
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer)
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
int shareHeartbeatInterval = (Integer)
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -356,13 +357,13 @@ public final class GroupConfig extends AbstractConfig {
* all values can be parsed and are valid. The provided properties are
merged with
* the broker-level defaults before validation.
*/
- public static void validate(Properties props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- Properties combinedConfigs = new Properties();
+ public static void validate(Map<String, ?> props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
+ Map<String, Object> combinedConfigs = new HashMap<>();
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
combinedConfigs.putAll(props);
validateNames(combinedConfigs);
- Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
+ Map<String, Object> valueMaps = CONFIG.parse(combinedConfigs);
validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index bd7bec0e98f..9b28b591574 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -120,7 +120,7 @@ public class GroupConfigTest {
@Test
public void testValidShareAutoOffsetResetValues() {
- Properties props = createValidGroupConfig();
+ Map<String, String> props = createValidGroupConfig();
// Check for value "latest"
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
@@ -139,7 +139,7 @@ public class GroupConfigTest {
@Test
public void testValidShareIsolationLevelValues() {
// Check for value READ_UNCOMMITTED
- Properties props = createValidGroupConfig();
+ Map<String, String> props = createValidGroupConfig();
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
doTestValidProps(props);
@@ -152,7 +152,7 @@ public class GroupConfigTest {
@Test
public void testInvalidProps() {
- Properties props = createValidGroupConfig();
+ Map<String, String> props = createValidGroupConfig();
// Check for invalid consumerSessionTimeoutMs, < MIN
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
@@ -276,11 +276,11 @@ public class GroupConfigTest {
doTestInvalidProps(props, ConfigException.class);
}
- private void doTestInvalidProps(Properties props, Class<? extends
Exception> exceptionClassName) {
+ private void doTestInvalidProps(Map<String, String> props, Class<? extends
Exception> exceptionClassName) {
assertThrows(exceptionClassName, () -> GroupConfig.validate(props,
createGroupCoordinatorConfig(), createShareGroupConfig()));
}
- private void doTestValidProps(Properties props) {
+ private void doTestValidProps(Map<String, String> props) {
assertDoesNotThrow(() -> GroupConfig.validate(props,
createGroupCoordinatorConfig(), createShareGroupConfig()));
}
@@ -324,7 +324,7 @@ public class GroupConfigTest {
@Test
public void testInvalidConfigName() {
- Properties props = new Properties();
+ Map<String, String> props = new HashMap<>();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
props.put("invalid.config.name", "10");
assertThrows(InvalidConfigurationException.class, () ->
GroupConfig.validate(props, createGroupCoordinatorConfig(),
createShareGroupConfig()));
@@ -347,7 +347,7 @@ public class GroupConfigTest {
ShareGroupConfig shareGroupConfig =
ShareGroupConfig.fromProps(overrides);
assertDoesNotThrow(() ->
- GroupConfig.validate(new Properties(), groupCoordinatorConfig,
shareGroupConfig));
+ GroupConfig.validate(new HashMap<>(), groupCoordinatorConfig,
shareGroupConfig));
}
@Test
@@ -487,8 +487,8 @@ public class GroupConfigTest {
assertEquals(expectedMax, result.get(key));
}
- private Properties createValidGroupConfig() {
- Properties props = new Properties();
+ private Map<String, String> createValidGroupConfig() {
+ Map<String, String> props = new HashMap<>();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
index 5b0ca350542..b168dc66c46 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
@@ -72,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -674,19 +673,19 @@ public class KafkaRaftLog implements RaftLog {
Scheduler scheduler,
MetadataLogConfig config,
int nodeId) throws IOException {
- Properties props = new Properties();
- props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
String.valueOf(config.internalMaxBatchSizeInBytes()));
+ Map<String, String> props = new HashMap<>();
+ props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
String.valueOf(config.internalMaxBatchSizeInBytes()));
if (config.internalSegmentBytes() != null) {
- props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
String.valueOf(config.internalSegmentBytes()));
+ props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
String.valueOf(config.internalSegmentBytes()));
} else {
- props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG,
String.valueOf(config.logSegmentBytes()));
+ props.put(TopicConfig.SEGMENT_BYTES_CONFIG,
String.valueOf(config.logSegmentBytes()));
}
- props.setProperty(TopicConfig.SEGMENT_MS_CONFIG,
String.valueOf(config.logSegmentMillis()));
- props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG,
String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT));
+ props.put(TopicConfig.SEGMENT_MS_CONFIG,
String.valueOf(config.logSegmentMillis()));
+ props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG,
String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT));
// Disable time and byte retention when deleting segments
- props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1");
- props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1");
+ props.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
+ props.put(TopicConfig.RETENTION_BYTES_CONFIG, "-1");
LogConfig.validate(props);
LogConfig defaultLogConfig = new LogConfig(props);
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index b7a66df7b16..90a36d5d3e7 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -140,27 +140,27 @@ public class ClientMetricsConfigs extends AbstractConfig {
return CONFIG.names();
}
- public static void validate(String subscriptionName, Properties
properties) {
+ public static void validate(String subscriptionName, Map<?, ?> props) {
if (subscriptionName == null || subscriptionName.isEmpty()) {
throw new InvalidRequestException("Subscription name can't be
empty");
}
- validateProperties(properties);
+ validateConfigs(props);
}
@SuppressWarnings("unchecked")
- private static void validateProperties(Properties properties) {
- // Make sure that all the properties are valid
- properties.forEach((key, value) -> {
+ private static void validateConfigs(Map<?, ?> configs) {
+ // Make sure that all the configs are valid
+ configs.forEach((key, value) -> {
if (!names().contains(key)) {
throw new InvalidRequestException("Unknown client metrics
configuration: " + key);
}
});
- Map<String, Object> parsed = CONFIG.parse(properties);
+ Map<String, Object> parsed = CONFIG.parse(configs);
// Make sure that push interval is between 100ms and 1 hour.
- if (properties.containsKey(INTERVAL_MS_CONFIG)) {
+ if (configs.containsKey(INTERVAL_MS_CONFIG)) {
int pushIntervalMs = (Integer) parsed.get(INTERVAL_MS_CONFIG);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs >
MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval
must be between 100 and 3600000 (1 hour)",
@@ -170,7 +170,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
}
// Make sure that client match patterns are valid by parsing them.
- if (properties.containsKey(MATCH_CONFIG)) {
+ if (configs.containsKey(MATCH_CONFIG)) {
List<String> patterns = (List<String>) parsed.get(MATCH_CONFIG);
// Parse the client matching patterns to validate if the patterns
are valid.
parseMatchingPatterns(patterns);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f81d224e7ea..4276ac71b3d 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -455,9 +455,9 @@ public class LogConfig extends AbstractConfig {
/**
* Check that property names are valid
*/
- public static void validateNames(Properties props) {
+ public static void validateNames(Map<String, String> props) {
List<String> names = configNames();
- for (Object name : props.keySet())
+ for (String name : props.keySet())
if (!names.contains(name))
throw new InvalidConfigurationException("Unknown topic config
name: " + name);
}
@@ -468,7 +468,7 @@ public class LogConfig extends AbstractConfig {
* LogConfig class.
* @param props The properties to be validated
*/
- public static void validateValues(Map<?, ?> props) {
+ public static void validateValues(Map<String, ?> props) {
long minCompactionLag = (Long)
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
@@ -484,7 +484,7 @@ public class LogConfig extends AbstractConfig {
* The default values should be extracted from the KafkaConfig.
* @param props The properties to be validated
*/
- public static void validateBrokerLogConfigValues(Map<?, ?> props,
+ public static void validateBrokerLogConfigValues(Map<String, ?> props,
boolean
isRemoteLogStorageSystemEnabled) {
validateValues(props);
if (isRemoteLogStorageSystemEnabled) {
@@ -502,7 +502,7 @@ public class LogConfig extends AbstractConfig {
* @param isRemoteLogStorageSystemEnabled true if system wise remote log
storage is enabled
*/
private static void validateTopicLogConfigValues(Map<String, String>
existingConfigs,
- Map<?, ?> newConfigs,
+ Map<String, ?> newConfigs,
boolean
isRemoteLogStorageSystemEnabled) {
validateValues(newConfigs);
@@ -520,7 +520,7 @@ public class LogConfig extends AbstractConfig {
}
}
- public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?>
newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
+ public static void validateTurningOffRemoteStorageWithDelete(Map<String,
?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
boolean isRemoteLogDeleteOnDisable = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
false);
if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled &&
!isRemoteLogDeleteOnDisable) {
throw new InvalidConfigurationException("It is invalid to disable
remote storage without deleting remote data. " +
@@ -529,7 +529,7 @@ public class LogConfig extends AbstractConfig {
}
}
- public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?,
?> newConfigs, boolean isRemoteLogStorageEnabled) {
+ public static void
validateRetentionConfigsWhenRemoteCopyDisabled(Map<String, ?> newConfigs,
boolean isRemoteLogStorageEnabled) {
boolean isRemoteLogCopyDisabled = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
false);
long retentionMs = (Long)
newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
long localRetentionMs = (Long)
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
@@ -547,7 +547,7 @@ public class LogConfig extends AbstractConfig {
}
}
- public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?>
props, boolean isRemoteLogStorageSystemEnabled, boolean
isReceivingConfigFromStore) {
+ public static void validateRemoteStorageOnlyIfSystemEnabled(Map<String, ?>
props, boolean isRemoteLogStorageSystemEnabled, boolean
isReceivingConfigFromStore) {
boolean isRemoteLogStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
if (isReceivingConfigFromStore) {
@@ -560,14 +560,14 @@ public class LogConfig extends AbstractConfig {
}
@SuppressWarnings("unchecked")
- private static void
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
+ private static void
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<String, ?> props) {
List<String> cleanupPolicy = (List<String>)
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
if (!cleanupPolicy.isEmpty() && (cleanupPolicy.size() != 1 ||
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0)))) {
throw new ConfigException("Remote log storage only supports topics
with cleanup.policy=delete or cleanup.policy being an empty list.");
}
}
- private static void validateRemoteStorageRetentionSize(Map<?, ?> props) {
+ private static void validateRemoteStorageRetentionSize(Map<String, ?>
props) {
Long retentionBytes = (Long)
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
Long localRetentionBytes = (Long)
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
if (retentionBytes > -1 && localRetentionBytes != -2) {
@@ -584,7 +584,7 @@ public class LogConfig extends AbstractConfig {
}
}
- private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
+ private static void validateRemoteStorageRetentionTime(Map<String, ?>
props) {
Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG);
Long localRetentionMs = (Long)
props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
if (retentionMs != -1 && localRetentionMs != -2) {
@@ -604,22 +604,22 @@ public class LogConfig extends AbstractConfig {
/**
* Check that the given properties contain only valid log config names and
that all values can be parsed and are valid
*/
- public static void validate(Properties props) {
+ public static void validate(Map<String, String> props) {
validate(Map.of(), props, Map.of(), false);
}
public static void validate(Map<String, String> existingConfigs,
- Properties props,
- Map<?, ?> configuredProps,
+ Map<String, String> props,
+ Map<String, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
- Map<?, ?> valueMaps = CONFIG.parse(props);
+ Map<String, ?> valueMaps = CONFIG.parse(props);
validateValues(valueMaps);
} else {
- Map<Object, Object> combinedConfigs = new
HashMap<>(configuredProps);
+ Map<String, Object> combinedConfigs = new
HashMap<>(configuredProps);
combinedConfigs.putAll(props);
- Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
+ Map<String, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled);
}
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
index 1d1d5c2f72c..691625b377e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
@@ -60,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -154,7 +155,7 @@ public abstract class TopicCommand {
return ret;
}
- private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions
opts) {
+ private static Map<String, String>
parseTopicConfigsToBeAdded(TopicCommandOptions opts) {
List<List<String>> configsToBeAdded =
opts.topicConfig().orElse(List.of())
.stream()
.map(s -> List.of(s.split("\\s*=\\s*")))
@@ -164,9 +165,8 @@ public abstract class TopicCommand {
throw new IllegalArgumentException("requirement failed: Invalid
topic config: all configs to be added must be in the format \"key=val\".");
}
- Properties props = new Properties();
- configsToBeAdded.stream()
- .forEach(pair -> props.setProperty(pair.get(0).trim(),
pair.get(1).trim()));
+ Map<String, String> props = new HashMap<>();
+ configsToBeAdded.forEach(pair -> props.put(pair.get(0).trim(),
pair.get(1).trim()));
LogConfig.validate(props);
return props;
}
@@ -243,7 +243,7 @@ public abstract class TopicCommand {
private final Optional<Integer> partitions;
private final Optional<Integer> replicationFactor;
private final Map<Integer, List<Integer>> replicaAssignment;
- private final Properties configsToAdd;
+ private final Map<String, String> configsToAdd;
private final TopicCommandOptions opts;
@@ -455,10 +455,7 @@ public abstract class TopicCommand {
? new NewTopic(topic.name, topic.replicaAssignment)
: new NewTopic(topic.name, topic.partitions,
topic.replicationFactor.map(Integer::shortValue));
- Map<String, String> configsMap =
topic.configsToAdd.stringPropertyNames().stream()
- .collect(Collectors.toMap(name -> name,
topic.configsToAdd::getProperty));
-
- newTopic.configs(configsMap);
+ newTopic.configs(topic.configsToAdd);
CreateTopicsResult createResult =
adminClient.createTopics(Set.of(newTopic),
new CreateTopicsOptions().retryOnQuotaViolation(false));
createResult.all().get();