chia7712 commented on code in PR #21481:
URL: https://github.com/apache/kafka/pull/21481#discussion_r2891594749
##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -104,42 +103,53 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
resource.`type`() match {
case TOPIC =>
validateTopicName(resource.name())
- val properties = new Properties()
+ val filteredConfigs = new util.HashMap[String, String]()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
newConfigs.forEach((key, value) => {
if (value == null) {
nullTopicConfigs += key
} else {
- properties.setProperty(key, value)
+ filteredConfigs.put(key, value)
}
})
if (nullTopicConfigs.nonEmpty) {
throw new InvalidConfigurationException("Null value not supported
for topic configs: " +
nullTopicConfigs.mkString(","))
}
- LogConfig.validate(oldConfigs, properties,
kafkaConfig.extractLogConfigMap,
+ LogConfig.validate(oldConfigs, filteredConfigs,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
- val properties = new Properties()
- newConfigs.forEach((key, value) => properties.setProperty(key, value))
- ClientMetricsConfigs.validate(resource.name(), properties)
+ val filteredConfigs = new util.HashMap[String, String]()
Review Comment:
It seems we can extract this common logic
```scala
private def filterAndValidateNullConfigs(newConfigs: util.Map[String,
String], resourceTypeName: String): util.Map[String, String] = {
val filteredConfigs = new util.HashMap[String, String]()
val nullConfigs = new mutable.ArrayBuffer[String]()
newConfigs.forEach((key, value) => {
if (value == null) {
nullConfigs += key
} else {
filteredConfigs.put(key, value)
}
})
if (nullConfigs.nonEmpty) {
throw new InvalidConfigurationException(
s"Null value not supported for $resourceTypeName configs:
${nullConfigs.mkString(",")}"
)
}
filteredConfigs
}
override def validate(
resource: ConfigResource,
newConfigs: util.Map[String, String],
oldConfigs: util.Map[String, String]
): Unit = {
resource.`type`() match {
case TOPIC =>
validateTopicName(resource.name())
val filteredConfigs = filterAndValidateNullConfigs(newConfigs,
"topic")
LogConfig.validate(oldConfigs, filteredConfigs,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val filteredConfigs = filterAndValidateNullConfigs(newConfigs,
"client metrics")
ClientMetricsConfigs.validate(resource.name(), filteredConfigs)
case GROUP =>
validateGroupName(resource.name())
val filteredConfigs = filterAndValidateNullConfigs(newConfigs,
"group")
GroupConfigManager.validate(filteredConfigs,
kafkaConfig.groupCoordinatorConfig, kafkaConfig.shareGroupConfig)
case _ => throwExceptionForUnknownResourceType(resource)
}
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -208,9 +208,9 @@ public static Set<String> configNames() {
/**
* Check that property names are valid
*/
- public static void validateNames(Properties props) {
+ public static void validateNames(Map<?, ?> props) {
Review Comment:
The key is always a string, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]