chia7712 commented on code in PR #16381:
URL: https://github.com/apache/kafka/pull/16381#discussion_r1668602936


##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -359,61 +359,64 @@ object ConfigCommand extends Logging {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
     val entityTypeHead = entityTypes.head
-    val entityNameHead = entityNames.head
     val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no 
need for mutability
     val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new 
ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
       case ConfigType.TOPIC =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
-
+        entityNames.foreach { entityName =>
+          val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityName, includeSynonyms = false, describeAll = false)
+            .map { entry => (entry.name, entry) }.toMap
+
+          // fail the command if any of the configs to be deleted does not 
exist
+          val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+          if (invalidConfigs.nonEmpty)
+            throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+
+          val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityName)
+          val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+          val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+            ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+            ).asJavaCollection
+          adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+        }
       case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+        entityNames.foreach { entityName =>

Review Comment:
   Could you please move those code to separate method? That can reduce the 
complexity and it can be test easily.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -359,61 +359,64 @@ object ConfigCommand extends Logging {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
     val entityTypeHead = entityTypes.head
-    val entityNameHead = entityNames.head
     val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no 
need for mutability
     val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new 
ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
       case ConfigType.TOPIC =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
-
+        entityNames.foreach { entityName =>
+          val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityName, includeSynonyms = false, describeAll = false)
+            .map { entry => (entry.name, entry) }.toMap
+
+          // fail the command if any of the configs to be deleted does not 
exist
+          val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+          if (invalidConfigs.nonEmpty)
+            throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+
+          val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityName)
+          val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+          val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+            ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+            ).asJavaCollection
+          adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

Review Comment:
   Could you please leverage `incrementalAlterConfigs` to send the requests 
together instead of loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to