chia7712 commented on code in PR #16652:
URL: https://github.com/apache/kafka/pull/16652#discussion_r1686731759
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
Review Comment:
we don't need to declare the type explicitly. for example:
```scala
def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
.partitionResult(tp).get().activeProducers().asScala
```
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ transactionResult.state()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
Review Comment:
please use `assertInstanceOf`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
Review Comment:
this is in scala world, so please use `TestUtils.waitUntilTrue()` instead.
For instance:
```scala
TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 1,
"add one user scram credential timeout")
```
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
Review Comment:
Could you please verify other fields also?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
Review Comment:
please use try-finally to make sure it gets closed
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ transactionResult.state()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ transactionResult.state()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val abortStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ abortStates.add(TransactionState.COMPLETE_ABORT)
+ options.filterStates(abortStates)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val producerIds: util.Collection[java.lang.Long] = new
util.ArrayList[java.lang.Long]()
+ producerIds.add(new java.lang.Long(0L))
Review Comment:
`producerIds.add(0L)`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,407 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 1
+ }
+ }, "add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ userTomResult.size() == 3
+ }
+ }, "add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ org.apache.kafka.test.TestUtils.waitForCondition(new TestCondition() {
+ override def conditionMet(): Boolean = {
+ // waiting metadata sync
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail(): mutable.Buffer[ProducerState] = {
+ val producerIterator =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+ producerIterator
+ }
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ transactionResult.state()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertTrue(exception.getCause.isInstanceOf[TimeoutException])
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val abortStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ abortStates.add(TransactionState.COMPLETE_ABORT)
+ options.filterStates(abortStates)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val producerIds: util.Collection[java.lang.Long] = new
util.ArrayList[java.lang.Long]()
+ producerIds.add(new java.lang.Long(0L))
+ options.filterProducerIds(producerIds)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ // ensure all transaction's txnStartTimestamp >= 500
+ options = new ListTransactionsOptions()
+ Thread.sleep(501)
+ options.filterOnDuration(500)
Review Comment:
Could you please test multi filters?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]