chia7712 commented on code in PR #16658:
URL: https://github.com/apache/kafka/pull/16658#discussion_r1709884524
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2961,6 +2961,36 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
testAppendConfig(props, "0:0", "1:1,0:0")
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("quorum=kraft"))
+ def testListClientMetricsResources(ignored: String): Unit = {
+ client = createAdminClient
+ def newTopic = new NewTopic(topic, partition, 0.toShort)
+ client.createTopics(Collections.singleton(newTopic))
+ assertTrue(client.listClientMetricsResources().all().get().isEmpty)
+ def name = "name"
+ def configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)
+ val configEntry = new ConfigEntry("interval.ms", "111")
+ def configOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)
+ client.incrementalAlterConfigs(Collections.singletonMap(configResource,
Collections.singletonList(configOp))).all().get()
+ def result = client.listClientMetricsResources().all().get()
+ def expected = Collections.singletonList(new
ClientMetricsResourceListing(name))
+ assertEquals(new util.HashSet(expected), new util.HashSet(result))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("quorum=kraft"))
+ @Timeout(30)
+ def testListClientMetricsResourcesTimeoutMs(ignored: String): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ def timeoutOption = new ListClientMetricsResourcesOptions().timeoutMs(0)
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2599,6 +2599,33 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
testAppendConfig(props, "0:0", "1:1,0:0")
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("quorum=kraft"))
+ def testListClientMetricsResources(ignored: String): Unit = {
Review Comment:
ok, that can work.
##########
core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala:
##########
@@ -107,6 +108,37 @@ class AdminFenceProducersIntegrationTest extends
IntegrationTestHarness {
assertThrows(classOf[ProducerFencedException], () =>
producer.commitTransaction())
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFenceProducerTimeoutMs(quorum: String): Unit = {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(record).get()
+ producer.commitTransaction()
+ def timeoutOption = new FenceProducersOptions()
+ try {
+ adminClient.fenceProducers(Collections.singletonList(txnId),
timeoutOption.timeoutMs(0)).all().get()
Review Comment:
this will be flaky. It can be completed if the request is executed
immediately. Please create a `Admin` with invalid bootstrap and then pass a
request having timeout=0. In that scenario, the request should return timeout
exception. see
https://github.com/apache/kafka/blob/0cbc5e083a936025a85f127102dc1032f6cf4fd9/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L159
for example
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]