chia7712 commented on code in PR #16652:
URL: https://github.com/apache/kafka/pull/16652#discussion_r1704145064
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,364 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName = "tom"
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 1,
+ "Add one user scram credential timeout")
+
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 3,
+ "Add user scram credential timeout")
+
+ // alter user info
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+
client.describeUserScramCredentials().all().get().get(targetUserName).credentialInfos().size()
== 2
+ }, "Alter user scram credential timeout")
+
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+
+ // test describeUserScramCredentials(List<String> users)
+ val userAndScramMap =
client.describeUserScramCredentials(Collections.singletonList("tom2")).all().get()
+ assertEquals(1, userAndScramMap.size())
+ val scram = userAndScramMap.get("tom2")
+ assertNotNull(scram)
+ val credentialInfo = scram.credentialInfos().get(0)
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialInfo.mechanism())
+ assertEquals(4096, credentialInfo.iterations())
+ }
+
+ private def createInvalidAdminClient(): Admin = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ Admin.create(config)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
Review Comment:
how about `@Timeout(10)`? In this test case we use 0 timeout and so 30
seconds is too large I feel.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,364 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName = "tom"
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 1,
+ "Add one user scram credential timeout")
+
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 3,
+ "Add user scram credential timeout")
+
+ // alter user info
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+
client.describeUserScramCredentials().all().get().get(targetUserName).credentialInfos().size()
== 2
+ }, "Alter user scram credential timeout")
+
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+
+ // test describeUserScramCredentials(List<String> users)
+ val userAndScramMap =
client.describeUserScramCredentials(Collections.singletonList("tom2")).all().get()
+ assertEquals(1, userAndScramMap.size())
+ val scram = userAndScramMap.get("tom2")
+ assertNotNull(scram)
+ val credentialInfo = scram.credentialInfos().get(0)
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialInfo.mechanism())
+ assertEquals(4096, credentialInfo.iterations())
+ }
+
+ private def createInvalidAdminClient(): Admin = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ Admin.create(config)
+ }
+
+ @ParameterizedTest
+ @Timeout(30)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentialsTimeout(quorum: String): Unit = {
+ client = createInvalidAdminClient()
+ // test describeUserScramCredentials(List<String> users,
DescribeUserScramCredentialsOptions options)
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ client.describeUserScramCredentials(Collections.singletonList("tom4"),
+ new DescribeUserScramCredentialsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ client.close(time.Duration.ZERO)
Review Comment:
Could you please use try-finally to make sure it gets closed with 0 timeout?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]