chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1703336803
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,249 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl),
new CreateAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
Review Comment:
this is unstable since it may pass when the test run fast enough. Maybe we
can create a `Admin` with incorrect bootstrap and then send this request with
timeout.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,249 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl),
new CreateAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ client.createAcls(Collections.singleton(acl)).all()
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY,
new DescribeAclsOptions().timeoutMs(2)).values(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+
+ client.createAcls(Collections.singleton(acl)).all()
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(AclBindingFilter.ANY),
new DeleteAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(): Unit = {
+ client = createAdminClient
+
+ // Create topics
+ val topic1 = "describe-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ val topicConfig1 = new Properties
+ topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
+ topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
+
+ val topic2 = "describe-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ createTopic(topic2)
+
+ // Describe topics and broker
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(2).config.brokerId.toString)
+ val configResources = Seq(topicResource1, topicResource2, brokerResource1,
brokerResource2)
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(
+ client.describeConfigs(configResources.asJava,new
DescribeConfigsOptions().timeoutMs(2)).all(),
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,249 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl),
new CreateAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ client.createAcls(Collections.singleton(acl)).all()
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY,
new DescribeAclsOptions().timeoutMs(2)).values(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+
+ client.createAcls(Collections.singleton(acl)).all()
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(AclBindingFilter.ANY),
new DeleteAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,249 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl),
new CreateAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ client.createAcls(Collections.singleton(acl)).all()
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY,
new DescribeAclsOptions().timeoutMs(2)).values(), classOf[TimeoutException])
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,249 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAclOperationsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl),
new CreateAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+ client.createAcls(Collections.singleton(acl)).all()
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY,
new DescribeAclsOptions().timeoutMs(2)).values(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteAclsWithOptionTimeoutMs(): Unit = {
+ val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
+ client = createAdminClient
+
+ client.createAcls(Collections.singleton(acl)).all()
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(AclBindingFilter.ANY),
new DeleteAclsOptions().timeoutMs(2)).all(), classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeConfigWithOptionTimeoutMs(): Unit = {
+ client = createAdminClient
+
+ // Create topics
+ val topic1 = "describe-alter-configs-topic-1"
+ val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+ val topicConfig1 = new Properties
+ topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "500000")
+ topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, "60000000")
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
+
+ val topic2 = "describe-alter-configs-topic-2"
+ val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+ createTopic(topic2)
+
+ // Describe topics and broker
+ val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(1).config.brokerId.toString)
+ val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER,
brokers(2).config.brokerId.toString)
+ val configResources = Seq(topicResource1, topicResource2, brokerResource1,
brokerResource2)
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(
+ client.describeConfigs(configResources.asJava,new
DescribeConfigsOptions().timeoutMs(2)).all(),
+ classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterConfigsWithOptionTimeoutMs(): Unit = {
+ val alterLogLevelsEntries = Seq(
+ new ConfigEntry("kafka.controller.KafkaController",
LogLevelConfig.INFO_LOG_LEVEL)
+ ).asJavaCollection
+ client = createAdminClient
+
+ val startTimeMs = Time.SYSTEM.milliseconds()
+ assertFutureExceptionTypeEquals(client.alterConfigs(
+ Map(brokerLoggerConfigResource -> new
Config(alterLogLevelsEntries)).asJava,
+ new AlterConfigsOptions().timeoutMs(2)).all(),
+ classOf[TimeoutException])
+ val endTimeMs = Time.SYSTEM.milliseconds()
+ assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreatePartitionWithOptionRetryOnQuotaViolation(): Unit = {
+ // Override old configs
+ val props = TestUtils.createBrokerConfigs(brokers.size, zkConnectOrNull)
+ val finalProps = props.map{prop => {
+ prop.put(QuotaConfigs.NUM_QUOTA_SAMPLES_CONFIG, "5")
+ prop.put(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG, "1")
+ prop
+ }}
+ val conf = finalProps.map(prop => KafkaConfig.fromProps(prop))
+
+ instanceConfigs = conf
+
+ for (i <- 0 until brokerCount) {
+ killBroker(i)
+ }
+ restartDeadBrokers()
+
+ client = createAdminClient
+
+ val brokerIds = brokers.map(_.config.brokerId).toSet
+ brokerIds.foreach(id => TestUtils.waitForOnlineBroker(client, id))
+
+ println(brokers.head.config)
+
+ // fire createPartitions without retry, will fail
+ // add retry config, will suceess
+
+ // def partitions(topic: String, expectedNumPartitionsOpt:
Option[Int]): util.List[TopicPartitionInfo] = {
Review Comment:
any updates?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]