dajac commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1479611758
##########
core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala:
##########
@@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends
QuorumTestHarness with Logging {
assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs,
None, encoderConfigs))
// Dynamic config updates using ZK should fail if broker is running.
- registerBrokerInZk(brokerId.toInt)
+ registerBrokerInZk(zkClient, brokerId.toInt)
assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
assertThrows(classOf[IllegalArgumentException], () =>
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
// Dynamic config updates using ZK should for a different broker that is
not running should succeed
alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
}
- private def registerBrokerInZk(id: Int): Unit = {
+ private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int):
Unit = {
zkClient.createTopLevelPaths()
val securityProtocol = SecurityProtocol.PLAINTEXT
val endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None),
MetadataVersion.latestTesting, jmxPort = 9192)
zkClient.registerBroker(brokerInfo)
}
+
+ @ClusterTest
+ def testUpdateInvalidBrokersConfig(): Unit = {
+ checkInvalidBrokerConfig(None)
+
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+ }
+
+ private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]):
Unit = {
+ for (incremental <- Array(true, false)) {
+ val entityNameParams = entityNameOrDefault.map(name =>
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "invalid=2",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ ), incremental)
+
+ val describeResult = TestUtils.grabConsoleOutput(
+ ConfigCommand.describeConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--describe",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ )))
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"))
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"))
+ }
+ }
+
+ @ClusterTest
+ def testUpdateInvalidTopicConfig(): Unit = {
+ TestUtils.createTopicWithAdminRaw(
+ admin = cluster.createAdminClient(),
+ topic = "test-config-topic",
+ )
+ assertInstanceOf(
+ classOf[InvalidConfigurationException],
+ assertThrows(
+ classOf[ExecutionException],
+ () => ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "invalid=2",
+ "--entity-type", "topics",
+ "--entity-name", "test-config-topic")
+ ), true)).getCause
+ )
+ }
+
+ @ClusterTest
+ def testUpdateAndDeleteBrokersConfig(): Unit = {
+ checkBrokerConfig(None)
+
checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+ }
+
+ private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = {
+ val entityNameParams = entityNameOrDefault.map(name =>
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ ), true)
+ TestUtils.waitUntilTrue(
+ () => cluster.brokerSocketServers().asScala.forall(broker =>
broker.config.getInt("log.cleaner.threads") == 2),
+ "Timeout waiting for topic config propagating to broker")
+
+ val describeResult = TestUtils.grabConsoleOutput(
+ ConfigCommand.describeConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--describe",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ )))
+ assertTrue(describeResult.contains("log.cleaner.threads=2"))
+ assertTrue(describeResult.contains("sensitive=false"))
+
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--delete-config", "log.cleaner.threads",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ ), true)
+ TestUtils.waitUntilTrue(
+ () =>
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker
=> broker.config.getInt("log.cleaner.threads") != 2),
+ "Timeout waiting for topic config propagating to broker")
+
+ assertFalse(TestUtils.grabConsoleOutput(
+ ConfigCommand.describeConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--describe",
+ "--entity-type", "brokers")
+ ++ entityNameParams
+ ))).contains("log.cleaner.threads"))
+ }
+
+ @ClusterTest
+ def testUpdateConfigAndDeleteTopicConfig(): Unit = {
+ TestUtils.createTopicWithAdminRaw(
+ admin = cluster.createAdminClient(),
+ topic = "test-config-topic",
+ )
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "segment.bytes=10240000",
+ "--entity-type", "topics",
+ "--entity-name", "test-config-topic")
+ ), true)
+ TestUtils.waitUntilTrue(
+ () =>
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker
=>
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
== 10240000),
+ "Timeout waiting for topic config propagating to broker")
+
+ val describeResult = TestUtils.grabConsoleOutput(
+ ConfigCommand.describeConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--describe",
+ "--entity-type", "topics",
+ "--entity-name", "test-config-topic")
+ )))
+ assertTrue(describeResult.contains("segment.bytes=10240000"))
+ assertTrue(describeResult.contains("sensitive=false"))
+
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--delete-config", "segment.bytes",
+ "--entity-type", "topics",
+ "--entity-name", "test-config-topic")
+ ), true)
+ TestUtils.waitUntilTrue(
+ () =>
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker
=>
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
!= 10240000),
+ "Timeout waiting for topic config propagating to broker")
+
+ assertFalse(TestUtils.grabConsoleOutput(
+ ConfigCommand.describeConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--describe",
+ "--entity-type", "topics",
+ "--entity-name", "test-config-topic")
+ ))).contains("segment.bytes"))
+ }
+
+ @ClusterTest
+ def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = {
+ // Test case from KAFKA-13788
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "log.cleaner.threadzz=2",
+ "--entity-type", "brokers",
+ "--entity-default")
+ ), true)
+
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default")
+ ), true)
+ }
+
+ // TODO this test doesn't make sense because we can't produce
`UnsupportedVersionException` by setting inter.broker.protocol.version
+ @ClusterTest(clusterType=Type.ZK, metadataVersion =
MetadataVersion.IBP_3_2_IV0)
+ def testFallbackToDeprecatedAlterConfigs(): Unit = {
+ ConfigCommand.alterConfig(cluster.createAdminClient(), new
ConfigCommandOptions(
+ Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default")
+ ), true)
+ }
Review Comment:
@dengziming One option would be to mock the admin client. This would allow
you to verify the behavior. Otherwise, we could also run the tool against an
old version in a system test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]