This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c72fcda85cc MINOR: Cleanup TestUtils.scala (#22091)
c72fcda85cc is described below
commit c72fcda85cc39d33738c16311d9268af8dcacdfe
Author: Ken Huang <[email protected]>
AuthorDate: Sat May 2 10:10:22 2026 +0800
MINOR: Cleanup TestUtils.scala (#22091)
Some methods are only used in a single class; they should be moved into
that class. Java tests should not use TestUtils.scala
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +-
.../kafka/api/BaseAdminIntegrationTest.scala | 5 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 192 +++++++++----
.../kafka/integration/KafkaServerTestHarness.scala | 31 ++-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 6 +-
.../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 44 ++-
.../unit/kafka/network/RequestChannelTest.scala | 46 +++-
.../server/GroupCoordinatorBaseRequestTest.scala | 9 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 15 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 300 +--------------------
.../org/apache/kafka/tools/ConfigCommandTest.java | 5 +-
.../kafka/tools/LeaderElectionCommandTest.java | 154 ++++++++---
13 files changed, 406 insertions(+), 411 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 161a6bf245d..d6c94d5baa4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -4189,11 +4189,11 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
def removeAllClientAcls(): Unit = {
- val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers,
controllerServers)
+ val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString,
null, AclOperation.ANY, AclPermissionType.ANY)
val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY,
aclEntryFilter)
- authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext,
java.util.List.of(aclFilter)).asScala.
+ authorizerForWrite.deleteAcls(anonymousAuthorizableContext,
java.util.List.of(aclFilter)).asScala.
map(_.toCompletableFuture.get).flatMap { deletion =>
deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
}.foreach { resource =>
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 16dec9dc008..5760042bbd5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -58,7 +58,10 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
override def setUp(testInfo: TestInfo): Unit = {
this.testInfo = testInfo
super.setUp(testInfo)
- waitUntilBrokerMetadataIsPropagated(brokers)
+ val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
+ waitUntilTrue(() => brokers.forall(server =>
+
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
+ ), "Timed out waiting for broker metadata to propagate to all servers",
15000)
}
@AfterEach
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b118a4ec02d..cadcddd99a5 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -70,7 +70,7 @@ import scala.collection.Seq
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
-import scala.util.{Random, Using}
+import scala.util.{Failure, Random, Success, Try, Using}
/**
* An integration test of the KafkaAdminClient.
@@ -1552,7 +1552,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val result1 = client.deleteRecords(util.Map.of(topicPartition,
RecordsToDelete.beforeOffset(117L)))
result1.all().get()
restartDeadBrokers()
- TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
+ waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
}
@@ -1675,7 +1675,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// delete records in corrupt segment (the first segment)
client.deleteRecords(util.Map.of(topicPartition,
RecordsToDelete.beforeOffset(firstSegmentRecordsSize))).all.get
// verify reassignment is finished after delete records
- TestUtils.waitForBrokersInIsr(client, topicPartition,
Set(partitionLeaderId, partitionFollowerId))
+ waitForBrokersInIsr(client, topicPartition, Set(partitionLeaderId,
partitionFollowerId))
// seek to beginning and make sure we can consume all records
consumer.seekToBeginning(util.List.of(topicPartition))
assertEquals(19, TestUtils.consumeRecords(consumer, 20 -
firstSegmentRecordsSize).last.offset())
@@ -3152,25 +3152,25 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
s"Expected preferred leader to become $preferred, but is
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
10000)
// Check the leader hasn't moved
- TestUtils.assertLeader(client, partition1, prior1)
- TestUtils.assertLeader(client, partition2, prior2)
+ assertLeader(client, partition1, prior1)
+ assertLeader(client, partition2, prior2)
}
// Check current leaders are 0
- TestUtils.assertLeader(client, partition1, 0)
- TestUtils.assertLeader(client, partition2, 0)
+ assertLeader(client, partition1, 0)
+ assertLeader(client, partition2, 0)
// Noop election
var electResult = client.electLeaders(ElectionType.PREFERRED,
util.Set.of(partition1))
val exception = electResult.partitions.get.get(partition1).get
assertEquals(classOf[ElectionNotNeededException], exception.getClass)
- TestUtils.assertLeader(client, partition1, 0)
+ assertLeader(client, partition1, 0)
// Noop election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertTrue(electResult.partitions.get.isEmpty)
- TestUtils.assertLeader(client, partition1, 0)
- TestUtils.assertLeader(client, partition2, 0)
+ assertLeader(client, partition1, 0)
+ assertLeader(client, partition2, 0)
// Now change the preferred leader to 1
waitForBrokerMetadataPropagation(partition1)
@@ -3182,18 +3182,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet)
electResult.partitions.get.get(partition1)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition1"))
- TestUtils.assertLeader(client, partition1, 1)
+ assertLeader(client, partition1, 1)
// topic 2 unchanged
assertFalse(electResult.partitions.get.containsKey(partition2))
- TestUtils.assertLeader(client, partition2, 0)
+ assertLeader(client, partition2, 0)
// meaningful election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
electResult.partitions.get.get(partition2)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition2"))
- TestUtils.assertLeader(client, partition2, 1)
+ assertLeader(client, partition2, 1)
def assertUnknownTopicOrPartition(
topicPartition: TopicPartition,
@@ -3209,8 +3209,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
electResult = client.electLeaders(ElectionType.PREFERRED,
util.Set.of(unknownPartition))
assertEquals(util.Set.of(unknownPartition),
electResult.partitions.get.keySet)
assertUnknownTopicOrPartition(unknownPartition, electResult)
- TestUtils.assertLeader(client, partition1, 1)
- TestUtils.assertLeader(client, partition2, 1)
+ assertLeader(client, partition1, 1)
+ assertLeader(client, partition2, 1)
// Now change the preferred leader to 2
waitForBrokerMetadataPropagation(partition1)
@@ -3220,15 +3220,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// mixed results
electResult = client.electLeaders(ElectionType.PREFERRED,
util.Set.of(unknownPartition, partition1))
assertEquals(util.Set.of(unknownPartition, partition1),
electResult.partitions.get.keySet)
- TestUtils.assertLeader(client, partition1, 2)
- TestUtils.assertLeader(client, partition2, 1)
+ assertLeader(client, partition1, 2)
+ assertLeader(client, partition2, 1)
assertUnknownTopicOrPartition(unknownPartition, electResult)
// elect preferred leader for partition 2
electResult = client.electLeaders(ElectionType.PREFERRED,
util.Set.of(partition2))
assertEquals(util.Set.of(partition2), electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- TestUtils.assertLeader(client, partition2, 2)
+ assertLeader(client, partition2, 2)
// Now change the preferred leader to 1
waitForBrokerMetadataPropagation(partition1)
@@ -3238,7 +3238,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
killBroker(1)
waitForBrokerMetadataPropagation(partition1)
waitForBrokerMetadataPropagation(partition2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2),
Set(1))
+ waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
def assertPreferredLeaderNotAvailable(
topicPartition: TopicPartition,
@@ -3257,17 +3257,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet)
assertPreferredLeaderNotAvailable(partition1, electResult)
- TestUtils.assertLeader(client, partition1, 2)
+ assertLeader(client, partition1, 2)
// preferred leader unavailable with null argument
electResult = client.electLeaders(ElectionType.PREFERRED, null,
shortTimeout)
assertTrue(Set(partition1,
partition2).subsetOf(electResult.partitions.get.keySet.asScala))
assertPreferredLeaderNotAvailable(partition1, electResult)
- TestUtils.assertLeader(client, partition1, 2)
+ assertLeader(client, partition1, 2)
assertPreferredLeaderNotAvailable(partition2, electResult)
- TestUtils.assertLeader(client, partition2, 2)
+ assertLeader(client, partition2, 2)
}
@Test
@@ -3283,19 +3283,19 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val partition1 = new TopicPartition("unclean-test-topic-1", 0)
createTopicWithAssignment(partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> assignment1))
- TestUtils.assertLeader(client, partition1, broker1)
+ assertLeader(client, partition1, broker1)
killBroker(broker2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+ waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
killBroker(broker1)
- TestUtils.assertNoLeader(client, partition1)
+ assertNoLeader(client, partition1)
brokers(broker2).startup()
- TestUtils.waitForOnlineBroker(client, broker2)
+ waitForOnlineBroker(client, broker2)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(partition1))
electResult.partitions.get.get(partition1)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition1"))
- TestUtils.assertLeader(client, partition1, broker2)
+ assertLeader(client, partition1, broker2)
}
@Test
@@ -3318,24 +3318,24 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
- TestUtils.assertLeader(client, partition1, broker1)
- TestUtils.assertLeader(client, partition2, broker1)
+ assertLeader(client, partition1, broker1)
+ assertLeader(client, partition2, broker1)
killBroker(broker2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2),
Set(broker2))
+ waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
killBroker(broker1)
- TestUtils.assertNoLeader(client, partition1)
- TestUtils.assertNoLeader(client, partition2)
+ assertNoLeader(client, partition1)
+ assertNoLeader(client, partition2)
brokers(broker2).startup()
- TestUtils.waitForOnlineBroker(client, broker2)
+ waitForOnlineBroker(client, broker2)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(partition1, partition2))
electResult.partitions.get.get(partition1)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition1"))
electResult.partitions.get.get(partition2)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition2"))
- TestUtils.assertLeader(client, partition1, broker2)
- TestUtils.assertLeader(client, partition2, broker2)
+ assertLeader(client, partition1, broker2)
+ assertLeader(client, partition2, broker2)
}
@Test
@@ -3359,23 +3359,23 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
- TestUtils.assertLeader(client, partition1, broker1)
- TestUtils.assertLeader(client, partition2, broker1)
+ assertLeader(client, partition1, broker1)
+ assertLeader(client, partition2, broker1)
killBroker(broker2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+ waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
killBroker(broker1)
- TestUtils.assertNoLeader(client, partition1)
- TestUtils.assertLeader(client, partition2, broker3)
+ assertNoLeader(client, partition1)
+ assertLeader(client, partition2, broker3)
brokers(broker2).startup()
- TestUtils.waitForOnlineBroker(client, broker2)
+ waitForOnlineBroker(client, broker2)
val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
electResult.partitions.get.get(partition1)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition1"))
assertFalse(electResult.partitions.get.containsKey(partition2))
- TestUtils.assertLeader(client, partition1, broker2)
- TestUtils.assertLeader(client, partition2, broker3)
+ assertLeader(client, partition1, broker2)
+ assertLeader(client, partition2, broker3)
}
@Test
@@ -3397,7 +3397,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(0 -> assignment1)
)
- TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
+ assertLeader(client, new TopicPartition(topic, 0), broker1)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(unknownPartition, unknownTopic))
assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@@ -3422,12 +3422,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(partition1.partition -> assignment1)
)
- TestUtils.assertLeader(client, partition1, broker1)
+ assertLeader(client, partition1, broker1)
killBroker(broker2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+ waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
killBroker(broker1)
- TestUtils.assertNoLeader(client, partition1)
+ assertNoLeader(client, partition1)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(partition1))
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@@ -3451,10 +3451,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(partition1.partition -> assignment1)
)
- TestUtils.assertLeader(client, partition1, broker1)
+ assertLeader(client, partition1, broker1)
killBroker(broker1)
- TestUtils.assertLeader(client, partition1, broker2)
+ assertLeader(client, partition1, broker2)
brokers(broker1).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(partition1))
@@ -3482,23 +3482,23 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Map(partition1.partition -> assignment1, partition2.partition ->
assignment2)
)
- TestUtils.assertLeader(client, partition1, broker1)
- TestUtils.assertLeader(client, partition2, broker1)
+ assertLeader(client, partition1, broker1)
+ assertLeader(client, partition2, broker1)
killBroker(broker2)
- TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+ waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
killBroker(broker1)
- TestUtils.assertNoLeader(client, partition1)
- TestUtils.assertLeader(client, partition2, broker3)
+ assertNoLeader(client, partition1)
+ assertLeader(client, partition2, broker3)
brokers(broker2).startup()
- TestUtils.waitForOnlineBroker(client, broker2)
+ waitForOnlineBroker(client, broker2)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
util.Set.of(partition1, partition2))
electResult.partitions.get.get(partition1)
.ifPresent(t => fail(s"Unexpected exception during leader election: $t
for partition $partition1"))
assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
- TestUtils.assertLeader(client, partition1, broker2)
- TestUtils.assertLeader(client, partition2, broker3)
+ assertLeader(client, partition1, broker2)
+ assertLeader(client, partition2, broker3)
}
@Test
@@ -5061,4 +5061,82 @@ object PlaintextAdminIntegrationTest {
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT,
configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
}
+
+ private def waitForBrokersInIsr(client: Admin, partition: TopicPartition,
brokerIds: Set[Int]): Unit = {
+ waitUntilTrue(
+ () => {
+ val isr = client.describeTopics(util.Set.of(partition.topic))
+ .allTopicNames
+ .get
+ .get(partition.topic)
+ .partitions.asScala
+ .filter(_.partition == partition.partition)
+ .flatMap(_.isr.asScala)
+ .map(_.id)
+ .toSet
+ brokerIds.subsetOf(isr)
+ },
+ s"Expected brokers $brokerIds to be in the ISR for $partition"
+ )
+ }
+
+ private def waitForBrokersOutOfIsr(client: Admin, partition:
Set[TopicPartition], brokerIds: Set[Int]): Unit = {
+ waitUntilTrue(
+ () => {
+ val description =
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
+ val isr = description
+ .flatMap { case (topic, desc) =>
+ desc.partitions.asScala
+ .filter(info => partition.contains(new TopicPartition(topic,
info.partition)))
+ .flatMap(_.isr.asScala)
+ }
+ .map(_.id)
+ .toSet
+
+ brokerIds.intersect(isr).isEmpty
+ },
+ s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
+ )
+ }
+
+ private def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+ waitUntilTrue(() => {
+ val nodes = client.describeCluster().nodes().get()
+ nodes.asScala.exists(_.id == brokerId)
+ }, s"Timed out waiting for brokerId $brokerId to come online")
+ }
+
+ private def assertLeader(client: Admin, topicPartition: TopicPartition,
expectedLeader: Int): Unit = {
+ waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
+ }
+
+ private def assertNoLeader(client: Admin, topicPartition: TopicPartition):
Unit = {
+ waitForLeaderToBecome(client, topicPartition, None)
+ }
+
+ private def waitForLeaderToBecome(
+ client: Admin,
+ topicPartition: TopicPartition,
+ expectedLeaderOpt: Option[Int]
+ ): Unit = {
+ val topic = topicPartition.topic
+ val partitionId = topicPartition.partition
+
+ def currentLeader: Try[Option[Int]] = Try {
+ val topicDescription =
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
+ topicDescription.partitions.asScala
+ .find(_.partition == partitionId)
+ .flatMap(partitionState => Option(partitionState.leader))
+ .map(_.id)
+ }
+
+ val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
+ case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
+ case Failure(e: ExecutionException) if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+ case Failure(e) => throw e
+ }
+
+ assertTrue(isLeaderElected, s"Timed out waiting for leader to become
$expectedLeaderOpt. " +
+ s"Last metadata lookup returned leader =
${lastLeaderCheck.getOrElse("unknown")}")
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 7f37eeb25a1..4b6690a4fdc 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, Uuid}
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
Authorizer => JAuthorizer}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import java.io.File
+import java.net.InetAddress
import java.time.Duration
import java.util
import java.util.Properties
@@ -214,6 +216,17 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
}
}
+ val anonymousAuthorizableContext = new AuthorizableRequestContext() {
+ override def listenerName(): String = ""
+ override def securityProtocol(): SecurityProtocol =
SecurityProtocol.PLAINTEXT
+ override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
+ override def clientAddress(): InetAddress = null
+ override def requestType(): Int = 0
+ override def requestVersion(): Int = 0
+ override def clientId(): String = ""
+ override def correlationId(): Int = 0
+ }
+
def addAndVerifyAcls(acls: Set[AccessControlEntry], resource:
ResourcePattern): Unit = {
val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
@@ -371,4 +384,20 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
}
}
}
+
+ /**
+ * Find an Authorizer that we can call createAcls or deleteAcls on.
+ */
+ def pickAuthorizerForWrite[B <: KafkaBroker](brokers: Seq[B], controllers:
Seq[ControllerServer]): JAuthorizer = {
+ if (controllers.isEmpty) {
+ brokers.head.authorizerPlugin.get.get
+ } else {
+ var result: JAuthorizer = null
+ TestUtils.retry(120000) {
+ val active = controllers.filter(_.controller.isActive).head
+ result = active.authorizerPlugin.get.get
+ }
+ result
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c51cba6a66f..e1aa985e01c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1229,7 +1229,7 @@ class LogCleanerTest extends Logging {
// the last (active) segment has just one message
- def distinctValuesBySegment = log.logSegments.asScala.map(s =>
s.log.records.asScala.map(record =>
TestUtils.readString(record.value)).toSet.size).toSeq
+ def distinctValuesBySegment = log.logSegments.asScala.map(s =>
s.log.records.asScala.map(record =>
Utils.utf8(record.value())).toSet.size).toSeq
val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
@@ -1927,7 +1927,7 @@ class LogCleanerTest extends Logging {
for (segment <- log.logSegments.asScala; batch <-
segment.log.batches.asScala; record <- batch.asScala) {
assertTrue(record.hasMagic(batch.magic))
- val value = TestUtils.readString(record.value).toLong
+ val value = Utils.utf8(record.value()).toLong
assertEquals(record.offset, value)
}
}
@@ -1947,7 +1947,7 @@ class LogCleanerTest extends Logging {
for (logEntry <- records.records.asScala) {
val offset = logEntry.offset
- val value = TestUtils.readString(logEntry.value).toLong
+ val value = Utils.utf8(logEntry.value()).toLong
assertEquals(offset, value)
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 9875a3b2ce4..e5d34608a04 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -19,7 +19,6 @@ package kafka.log
import java.io.File
import java.util.Properties
-import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.record.internal.{ControlRecordType,
EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
@@ -38,7 +37,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
import org.apache.kafka.common.message.AbortedTxn
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogOffsetsListener, LogSegment, ProducerStateManager,
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogOffsetsListener, LogSegment, ProducerStateManager,
ProducerStateManagerConfig, TransactionIndex, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import scala.jdk.CollectionConverters._
@@ -202,7 +201,7 @@ object LogTestUtils {
for (logSegment <- log.logSegments.asScala;
batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
record <- batch.asScala if record.hasValue && record.hasKey)
- yield TestUtils.readString(record.key).toLong
+ yield Utils.utf8(record.key()).toLong
}
def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys:
Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler:
Scheduler): UnifiedLog = {
@@ -308,4 +307,5 @@ object LogTestUtils {
sequence += numRecords
}
}
+
}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 770ec432118..e30531779c2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -36,11 +36,14 @@ import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics,
LinuxIoMetricsCollector}
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
+import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
+import java.io.File
import scala.jdk.OptionConverters.RichOptional
@Timeout(120)
@@ -64,7 +67,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
val topic = "test-topic-metric"
createTopic(topic)
deleteTopic(topic)
- TestUtils.verifyTopicDeletion(topic, 1, brokers)
+ verifyTopicDeletion(topic, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists
after deleteTopic")
}
@@ -78,7 +81,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
deleteTopic(topic)
- TestUtils.verifyTopicDeletion(topic, 1, brokers)
+ verifyTopicDeletion(topic, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists
after deleteTopic")
}
@@ -244,4 +247,41 @@ class MetricsTest extends KafkaServerTestHarness with
Logging {
val pattern = (".*BrokerTopicMetrics.*" + topic.map(t =>
s"($t)$$").getOrElse("")).r.pattern
metrics.filter(pattern.matcher(_).matches())
}
+
+ private def verifyTopicDeletion[B <: KafkaBroker](topic: String, brokers:
Seq[B]): Unit = {
+ val topicPartitions = (0 until 1).map(new TopicPartition(topic, _))
+ // ensure that the topic-partition has been deleted from all brokers'
replica managers
+ TestUtils.waitUntilTrue(() =>
+ brokers.forall(broker => topicPartitions.forall(tp =>
broker.replicaManager.onlinePartition(tp).isEmpty)),
+ "Replica manager's should have deleted all of this topic's partitions")
+ // ensure that logs from all replicas are deleted
+ TestUtils.waitUntilTrue(() => brokers.forall(broker =>
topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)),
+ "Replica logs not deleted after delete topic is complete")
+ // ensure that topic is removed from all cleaner offsets
+ TestUtils.waitUntilTrue(() => brokers.forall(broker =>
topicPartitions.forall { tp =>
+ val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
+ new OffsetCheckpointFile(new File(logDir,
"cleaner-offset-checkpoint"), null).read()
+ }
+ checkpoints.forall(checkpointsPerLogDir =>
!checkpointsPerLogDir.containsKey(tp))
+ }), "Cleaner offset for deleted partition should have been removed")
+ TestUtils.waitUntilTrue(() => brokers.forall(broker =>
+ broker.config.logDirs.stream().allMatch { logDir =>
+ topicPartitions.forall { tp =>
+ !new File(logDir, tp.topic + "-" + tp.partition).exists()
+ }
+ }
+ ), "Failed to soft-delete the data to a delete directory")
+ TestUtils.waitUntilTrue(() => brokers.forall(broker =>
+ broker.config.logDirs.stream().allMatch { logDir =>
+ topicPartitions.forall { tp =>
+ !util.List.of(new File(logDir).list()).asScala.exists {
partitionDirectoryNames =>
+ partitionDirectoryNames.exists { directoryName =>
+ directoryName.startsWith(tp.topic + "-" + tp.partition) &&
+ directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
+ }
+ }
+ }
+ }
+ ), "Failed to hard-delete the delete directory")
+ }
}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index cb66e18afff..800e6c2039d 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -20,7 +20,6 @@ package kafka.network
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network
import kafka.server.EnvelopeUtils
-import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs,
SslConfigs, TopicConfig}
@@ -29,7 +28,7 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
import org.apache.kafka.common.message.{CreateTopicsRequestData,
CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
@@ -48,6 +47,7 @@ import java.io.IOException
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util
+import java.util.Optional
import java.util.concurrent.atomic.AtomicReference
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -260,7 +260,7 @@ class RequestChannelTest {
}
private def buildUnwrappedEnvelopeRequest(request: AbstractRequest):
RequestChannel.Request = {
- val wrappedRequest = TestUtils.buildEnvelopeRequest(
+ val wrappedRequest = buildEnvelopeRequest(
request,
principalSerde,
requestChannelMetrics,
@@ -278,6 +278,46 @@ class RequestChannelTest {
unwrappedRequest.get()
}
+ def buildEnvelopeRequest(
+ request: AbstractRequest,
+ principalSerde: KafkaPrincipalSerde,
+ requestChannelMetrics: RequestChannelMetrics,
+ startTimeNanos: Long,
+ dequeueTimeNanos: Long = -1,
+ fromPrivilegedListener: Boolean = true
+ ): RequestChannel.Request = {
+ val clientId = "id"
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+ val requestHeader = new RequestHeader(request.apiKey, request.version,
clientId, 0)
+ val requestBuffer = request.serializeWithHeader(requestHeader)
+
+ val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+ val envelopeBuffer = new EnvelopeRequest.Builder(
+ requestBuffer,
+ principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+ InetAddress.getLocalHost.getAddress
+ ).build().serializeWithHeader(envelopeHeader)
+
+ RequestHeader.parse(envelopeBuffer)
+
+ val envelopeContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost, Optional.empty(),
+ KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
+ fromPrivilegedListener, Optional.of(principalSerde))
+
+ val envelopRequest = new RequestChannel.Request(
+ processor = 1,
+ context = envelopeContext,
+ startTimeNanos = startTimeNanos,
+ memoryPool = MemoryPool.NONE,
+ buffer = envelopeBuffer,
+ metrics = requestChannelMetrics,
+ envelope = None
+ )
+ envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
+ envelopRequest
+ }
+
private def isValidJson(str: String): Boolean = {
try {
val mapper = new ObjectMapper
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 50bad2dfcd3..cd9afcc889b 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
@@ -70,10 +71,14 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
protected def createTransactionStateTopic(): Unit = {
val admin = cluster.admin()
try {
- TestUtils.createTransactionStateTopicWithAdmin(
+ TestUtils.createTopicWithAdmin(
admin = admin,
+ topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
+ numPartitions =
brokers().head.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
+ replicationFactor =
brokers().head.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
brokers = brokers(),
- controllers = controllerServers()
+ controllers = controllerServers(),
+ topicConfig = new Properties()
)
} finally {
admin.close()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d3009e989dc..be14bcb3314 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,6 +17,7 @@
package kafka.server
+import com.yammer.metrics.core.{Histogram, Meter}
import kafka.cluster.Partition
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.network.RequestChannel
@@ -96,7 +97,7 @@ import org.apache.kafka.server.authorizer.{Action,
AuthorizationResult, Authoriz
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion,
StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
-import org.apache.kafka.server.metrics.ClientMetricsTestUtils
+import org.apache.kafka.server.metrics.{ClientMetricsTestUtils,
KafkaYammerMetrics}
import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota,
ReplicationQuotaManager, ThrottleCallback}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2262,12 +2263,22 @@ class KafkaApisTest extends Logging {
assertEquals(expectedError, error)
val metricName = if (version < 4) ApiKeys.ADD_PARTITIONS_TO_TXN.name
else RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME
- assertEquals(8, TestUtils.metersCount(metricName))
+ assertEquals(8, metersCount(metricName))
} finally {
requestMetrics.close()
}
}
+ private def metersCount(metricName: String): Long = {
+ KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+ .values.map {
+ case histogram: Histogram => histogram.count()
+ case meter: Meter => meter.count()
+ case _ => 0
+ }.sum
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fc1f10fbbd7..b55661b95ef 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -16,8 +16,7 @@
*/
package kafka.utils
-import com.yammer.metrics.core.{Histogram, Meter}
-import kafka.network.RequestChannel
+import com.yammer.metrics.core.Meter
import kafka.security.JaasTestUtils
import kafka.server._
import kafka.utils.Implicits._
@@ -32,15 +31,12 @@ import org.apache.kafka.common.config.{ConfigException,
ConfigResource}
import org.apache.kafka.common.errors.{TopicExistsException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.{Plugin, Topic}
-import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ClientInformation, ConnectionMode,
ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.record.internal._
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.Utils.formatAddress
@@ -48,14 +44,12 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr,
MockConfigRepository}
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
Authorizer => JAuthorizer}
+import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs,
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner,
LogConfig, LogDirFailureChannel, LogManager, ProducerStateManagerConfig,
UnifiedLog}
+import org.apache.kafka.storage.internals.log._
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@@ -63,9 +57,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean}
import org.mockito.Mockito
import java.io._
-import java.net.InetAddress
import java.nio._
-import java.nio.charset.{Charset, StandardCharsets}
+import java.nio.charset.StandardCharsets
import java.nio.file.{Files, StandardOpenOption}
import java.time.Duration
import java.util
@@ -78,7 +71,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
import scala.jdk.javaapi.OptionConverters
-import scala.util.{Failure, Success, Try}
/**
* Utility functions to help with testing
@@ -99,7 +91,6 @@ object TestUtils extends Logging {
private val transactionStatusKey = "transactionStatus"
private val committedValue : Array[Byte] =
"committed".getBytes(StandardCharsets.UTF_8)
- private val abortedValue : Array[Byte] =
"aborted".getBytes(StandardCharsets.UTF_8)
sealed trait LogDirFailureType
case object Roll extends LogDirFailureType
@@ -131,11 +122,6 @@ object TestUtils extends Logging {
*/
def tempFile(prefix: String, suffix: String): File =
JTestUtils.tempFile(prefix, suffix)
- def tempPropertiesFile(properties: Map[String, String]): File = {
- val content = properties.map{case (k, v) => k + "=" +
v}.mkString(System.lineSeparator())
- JTestUtils.tempFile(content)
- }
-
/**
* Create a test config for the provided parameters.
*
@@ -440,23 +426,6 @@ object TestUtils extends Logging {
)
}
- def createTransactionStateTopicWithAdmin[B <: KafkaBroker](
- admin: Admin,
- brokers: Seq[B],
- controllers: Seq[ControllerServer]
- ): Map[Int, Int] = {
- val broker = brokers.head
- createTopicWithAdmin(
- admin = admin,
- topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
- numPartitions =
broker.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
- replicationFactor =
broker.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
- brokers = brokers,
- controllers = controllers,
- topicConfig = new Properties(),
- )
- }
-
def deleteTopicWithAdmin[B <: KafkaBroker](
admin: Admin,
topic: String,
@@ -576,7 +545,7 @@ object TestUtils extends Logging {
*
* @return The new leader (note that negative values are used to indicate
conditions like NoLeader and
* LeaderDuringDelete).
- * @throws AssertionError if the expected condition is not true within the
timeout.
+ * @throws java.lang.AssertionError if the expected condition is not true
within the timeout.
*/
def waitUntilLeaderIsElectedOrChangedWithAdmin(
admin: Admin,
@@ -597,17 +566,6 @@ object TestUtils extends Logging {
}
}
}
- doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition,
timeoutMs, oldLeaderOpt, newLeaderOpt)
- }
-
- private def doWaitUntilLeaderIsElectedOrChanged(
- getPartitionLeader: (String, Int) => Option[Int],
- topic: String,
- partition: Int,
- timeoutMs: Long,
- oldLeaderOpt: Option[Int],
- newLeaderOpt: Option[Int]
- ): Int = {
require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define
both the old and the new leader")
val startTime = System.currentTimeMillis()
val topicPartition = new TopicPartition(topic, partition)
@@ -804,21 +762,6 @@ object TestUtils extends Logging {
.getOrElse(throw new AssertionError(s"Unable to locate follower for
$topicPartition"))
}
- /**
- * Wait until all brokers know about each other.
- *
- * @param brokers The Kafka brokers.
- * @param timeout The amount of time waiting on this condition before
assert to fail
- */
- def waitUntilBrokerMetadataIsPropagated[B <: KafkaBroker](
- brokers: Seq[B],
- timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
- val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
- waitUntilTrue(() => brokers.forall(server =>
-
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
- ), "Timed out waiting for broker metadata to propagate to all servers",
timeout)
- }
-
/**
* Wait until the expected number of partitions is in the metadata cache in
each broker.
*
@@ -1041,58 +984,6 @@ object TestUtils extends Logging {
}
}
- def verifyTopicDeletion[B <: KafkaBroker](
- topic: String,
- numPartitions: Int,
- brokers: Seq[B]): Unit = {
- val topicPartitions = (0 until numPartitions).map(new
TopicPartition(topic, _))
- // ensure that the topic-partition has been deleted from all brokers'
replica managers
- waitUntilTrue(() =>
- brokers.forall(broker => topicPartitions.forall(tp =>
broker.replicaManager.onlinePartition(tp).isEmpty)),
- "Replica manager's should have deleted all of this topic's partitions")
- // ensure that logs from all replicas are deleted
- waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp =>
broker.logManager.getLog(tp).isEmpty)),
- "Replica logs not deleted after delete topic is complete")
- // ensure that topic is removed from all cleaner offsets
- waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp =>
- val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
- new OffsetCheckpointFile(new File(logDir,
"cleaner-offset-checkpoint"), null).read()
- }
- checkpoints.forall(checkpointsPerLogDir =>
!checkpointsPerLogDir.containsKey(tp))
- }), "Cleaner offset for deleted partition should have been removed")
- waitUntilTrue(() => brokers.forall(broker =>
- broker.config.logDirs.stream().allMatch { logDir =>
- topicPartitions.forall { tp =>
- !new File(logDir, tp.topic + "-" + tp.partition).exists()
- }
- }
- ), "Failed to soft-delete the data to a delete directory")
- waitUntilTrue(() => brokers.forall(broker =>
- broker.config.logDirs.stream().allMatch { logDir =>
- topicPartitions.forall { tp =>
- !util.List.of(new File(logDir).list()).asScala.exists {
partitionDirectoryNames =>
- partitionDirectoryNames.exists { directoryName =>
- directoryName.startsWith(tp.topic + "-" + tp.partition) &&
- directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
- }
- }
- }
- }
- ), "Failed to hard-delete the delete directory")
- }
-
- /**
- * Translate the given buffer into a string
- *
- * @param buffer The buffer to translate
- * @param encoding The encoding to use in translating bytes to characters
- */
- def readString(buffer: ByteBuffer, encoding: String =
Charset.defaultCharset.toString): String = {
- val bytes = new Array[Byte](buffer.remaining)
- buffer.get(bytes)
- new String(bytes, encoding)
- }
-
def waitAndVerifyAcls(expected: Set[AccessControlEntry],
authorizerPlugin: Plugin[JAuthorizer],
resource: ResourcePattern,
@@ -1112,7 +1003,7 @@ object TestUtils extends Logging {
45000)
}
- def consumeTopicRecords[K, V, B <: KafkaBroker](
+ def consumeTopicRecords[B <: KafkaBroker](
brokers: Seq[B],
topic: String,
numMessages: Int,
@@ -1223,7 +1114,7 @@ object TestUtils extends Logging {
override def value() = if (willBeCommitted)
committedValue
else
- abortedValue
+ "aborted".getBytes(StandardCharsets.UTF_8)
}
new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value,
util.Set.of(header))
}
@@ -1270,86 +1161,6 @@ object TestUtils extends Logging {
adminClient.incrementalAlterConfigs(configs)
}
- def assertLeader(client: Admin, topicPartition: TopicPartition,
expectedLeader: Int): Unit = {
- waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
- }
-
- def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = {
- waitForLeaderToBecome(client, topicPartition, None)
- }
-
- def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
- waitUntilTrue(() => {
- val nodes = client.describeCluster().nodes().get()
- nodes.asScala.exists(_.id == brokerId)
- }, s"Timed out waiting for brokerId $brokerId to come online")
- }
-
- def waitForLeaderToBecome(
- client: Admin,
- topicPartition: TopicPartition,
- expectedLeaderOpt: Option[Int]
- ): Unit = {
- val topic = topicPartition.topic
- val partitionId = topicPartition.partition
-
- def currentLeader: Try[Option[Int]] = Try {
- val topicDescription =
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
- topicDescription.partitions.asScala
- .find(_.partition == partitionId)
- .flatMap(partitionState => Option(partitionState.leader))
- .map(_.id)
- }
-
- val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
- case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
- case Failure(e: ExecutionException) if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
- case Failure(e) => throw e
- }
-
- assertTrue(isLeaderElected, s"Timed out waiting for leader to become
$expectedLeaderOpt. " +
- s"Last metadata lookup returned leader =
${lastLeaderCheck.getOrElse("unknown")}")
- }
-
- def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition],
brokerIds: Set[Int]): Unit = {
- waitUntilTrue(
- () => {
- val description =
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
- val isr = description
- .values
- .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
- .map(_.id)
- .toSet
-
- brokerIds.intersect(isr).isEmpty
- },
- s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
- )
- }
-
- def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
- val description = admin.describeTopics(util.Set.of(partition.topic))
- .allTopicNames
- .get
- .asScala
-
- description
- .values
- .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
- .map(_.id)
- .toSet
- }
-
- def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds:
Set[Int]): Unit = {
- waitUntilTrue(
- () => {
- val isr = currentIsr(client, partition)
- brokerIds.subsetOf(isr)
- },
- s"Expected brokers $brokerIds to be in the ISR for $partition"
- )
- }
-
def assertBadConfigContainingMessage(props: Properties,
expectedExceptionContainsText: String): Unit = {
try {
KafkaConfig.fromProps(props)
@@ -1364,105 +1175,18 @@ object TestUtils extends Logging {
}
def totalMetricValue(broker: KafkaBroker, metricName: String): Long = {
- totalMetricValue(broker.metrics, metricName)
- }
-
- def totalMetricValue(metrics: Metrics, metricName: String): Long = {
- val allMetrics = metrics.metrics
+ val allMetrics = broker.metrics.metrics
val total = allMetrics.values().asScala.filter(_.metricName().name() ==
metricName)
.foldLeft(0.0)((total, metric) => total +
metric.metricValue.asInstanceOf[Double])
total.toLong
}
def meterCount(metricName: String): Long = {
- meterCountOpt(metricName).getOrElse(fail(s"Unable to find metric
$metricName"))
- }
-
- def meterCountOpt(metricName: String): Option[Long] = {
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
.values
.headOption
.map(_.asInstanceOf[Meter].count)
- }
-
- def metersCount(metricName: String): Long = {
- KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
- .values.map {
- case histogram: Histogram => histogram.count()
- case meter: Meter => meter.count()
- case _ => 0
- }.sum
- }
-
- /**
- * Find an Authorizer that we can call createAcls or deleteAcls on.
- */
- def pickAuthorizerForWrite[B <: KafkaBroker](
- brokers: Seq[B],
- controllers: Seq[ControllerServer],
- ): JAuthorizer = {
- if (controllers.isEmpty) {
- brokers.head.authorizerPlugin.get.get
- } else {
- var result: JAuthorizer = null
- TestUtils.retry(120000) {
- val active = controllers.filter(_.controller.isActive).head
- result = active.authorizerPlugin.get.get
- }
- result
- }
- }
-
- val anonymousAuthorizableContext = new AuthorizableRequestContext() {
- override def listenerName(): String = ""
- override def securityProtocol(): SecurityProtocol =
SecurityProtocol.PLAINTEXT
- override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
- override def clientAddress(): InetAddress = null
- override def requestType(): Int = 0
- override def requestVersion(): Int = 0
- override def clientId(): String = ""
- override def correlationId(): Int = 0
- }
-
- def buildEnvelopeRequest(
- request: AbstractRequest,
- principalSerde: KafkaPrincipalSerde,
- requestChannelMetrics: RequestChannelMetrics,
- startTimeNanos: Long,
- dequeueTimeNanos: Long = -1,
- fromPrivilegedListener: Boolean = true
- ): RequestChannel.Request = {
- val clientId = "id"
- val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-
- val requestHeader = new RequestHeader(request.apiKey, request.version,
clientId, 0)
- val requestBuffer = request.serializeWithHeader(requestHeader)
-
- val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
- val envelopeBuffer = new EnvelopeRequest.Builder(
- requestBuffer,
- principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
- InetAddress.getLocalHost.getAddress
- ).build().serializeWithHeader(envelopeHeader)
-
- RequestHeader.parse(envelopeBuffer)
-
- val envelopeContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost, Optional.empty(),
- KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
- fromPrivilegedListener, Optional.of(principalSerde))
-
- val envelopRequest = new RequestChannel.Request(
- processor = 1,
- context = envelopeContext,
- startTimeNanos = startTimeNanos,
- memoryPool = MemoryPool.NONE,
- buffer = envelopeBuffer,
- metrics = requestChannelMetrics,
- envelope = None
- )
- envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
- envelopRequest
+ .getOrElse(fail(s"Unable to find metric $metricName"))
}
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index e815514aac3..29092e0c304 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -72,8 +72,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.jdk.javaapi.CollectionConverters;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -791,7 +789,7 @@ public class ConfigCommandTest {
addedConfigs.put("delete.retention.ms", "1000000");
addedConfigs.put("min.insync.replicas", "2");
if (file) {
- File f =
kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs));
+ File f = ToolsTestUtils.tempPropertiesFile(addedConfigs);
filePath = f.getPath();
}
@@ -1439,7 +1437,6 @@ public class ConfigCommandTest {
return res;
}
-
static class DummyAdminClient extends MockAdminClient {
public DummyAdminClient(Node node) {
super(List.of(node), node);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index 2f5f92a6ee7..d3d646cae7e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.tools;
-import kafka.utils.TestUtils;
-
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.test.ClusterInstance;
@@ -29,6 +29,7 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.utils.internals.Exit;
+import org.apache.kafka.test.TestUtils;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
@@ -38,6 +39,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,8 +48,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-
-import scala.jdk.javaapi.CollectionConverters;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -71,7 +72,7 @@ public class LeaderElectionCommandTest {
}
@ClusterTest
- public void testAllTopicPartition() throws InterruptedException,
ExecutionException {
+ public void testAllTopicPartition() throws Exception {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = List.of(broker2, broker3);
@@ -83,16 +84,16 @@ public class LeaderElectionCommandTest {
TopicPartition topicPartition = new TopicPartition(topic,
partition);
- TestUtils.assertLeader(client, topicPartition, broker2);
+ assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
- TestUtils.waitForBrokersOutOfIsr(client,
-
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
- CollectionConverters.asScala(List.of(broker3)).toSet()
+ waitForBrokersOutOfIsr(client,
+ Set.of(topicPartition),
+ Set.of(broker3)
);
cluster.shutdownBroker(broker2);
- TestUtils.assertNoLeader(client, topicPartition);
+ assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
- TestUtils.waitForOnlineBroker(client, broker3);
+ waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
@@ -100,7 +101,7 @@ public class LeaderElectionCommandTest {
"--all-topic-partitions"
));
- TestUtils.assertLeader(client, topicPartition, broker3);
+ assertLeader(client, topicPartition, broker3);
}
}
@@ -178,7 +179,7 @@ public class LeaderElectionCommandTest {
}
@ClusterTest
- public void testTopicPartition() throws InterruptedException,
ExecutionException {
+ public void testTopicPartition() throws Exception {
String topic = "unclean-topic";
int partition = 0;
List<Integer> assignment = List.of(broker2, broker3);
@@ -189,17 +190,17 @@ public class LeaderElectionCommandTest {
TopicPartition topicPartition = new TopicPartition(topic,
partition);
- TestUtils.assertLeader(client, topicPartition, broker2);
+ assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
- TestUtils.waitForBrokersOutOfIsr(client,
-
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
- CollectionConverters.asScala(List.of(broker3)).toSet()
+ waitForBrokersOutOfIsr(client,
+ Set.of(topicPartition),
+ Set.of(broker3)
);
cluster.shutdownBroker(broker2);
- TestUtils.assertNoLeader(client, topicPartition);
+ assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
- TestUtils.waitForOnlineBroker(client, broker3);
+ waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
@@ -208,7 +209,7 @@ public class LeaderElectionCommandTest {
"--partition", Integer.toString(partition)
));
- TestUtils.assertLeader(client, topicPartition, broker3);
+ assertLeader(client, topicPartition, broker3);
}
}
@@ -227,17 +228,17 @@ public class LeaderElectionCommandTest {
TopicPartition topicPartition = new TopicPartition(topic,
partition);
- TestUtils.assertLeader(client, topicPartition, broker2);
+ assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
- TestUtils.waitForBrokersOutOfIsr(client,
-
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
- CollectionConverters.asScala(List.of(broker3)).toSet()
+ waitForBrokersOutOfIsr(client,
+ Set.of(topicPartition),
+ Set.of(broker3)
);
cluster.shutdownBroker(broker2);
- TestUtils.assertNoLeader(client, topicPartition);
+ assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
- TestUtils.waitForOnlineBroker(client, broker3);
+ waitForOnlineBroker(client, broker3);
Path topicPartitionPath =
tempTopicPartitionFile(List.of(topicPartition));
@@ -247,12 +248,12 @@ public class LeaderElectionCommandTest {
"--path-to-json-file", topicPartitionPath.toString()
));
- TestUtils.assertLeader(client, topicPartition, broker3);
+ assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
- public void testPreferredReplicaElection() throws InterruptedException,
ExecutionException {
+ public void testPreferredReplicaElection() throws Exception {
String topic = "preferred-topic";
int partition = 0;
List<Integer> assignment = List.of(broker2, broker3);
@@ -266,14 +267,12 @@ public class LeaderElectionCommandTest {
TopicPartition topicPartition = new TopicPartition(topic,
partition);
- TestUtils.assertLeader(client, topicPartition, broker2);
+ assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker2);
- TestUtils.assertLeader(client, topicPartition, broker3);
+ assertLeader(client, topicPartition, broker3);
cluster.startBroker(broker2);
- TestUtils.waitForBrokersInIsr(client, topicPartition,
- CollectionConverters.asScala(List.of(broker2)).toSet()
- );
+ waitForBrokersInIsr(client, topicPartition, Set.of(broker2));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
@@ -282,7 +281,7 @@ public class LeaderElectionCommandTest {
"--partition", Integer.toString(partition)
));
- TestUtils.assertLeader(client, topicPartition, broker2);
+ assertLeader(client, topicPartition, broker2);
}
}
@@ -319,18 +318,14 @@ public class LeaderElectionCommandTest {
topicPartition0 = new TopicPartition(topic, partition0);
topicPartition1 = new TopicPartition(topic, partition1);
- TestUtils.assertLeader(client, topicPartition0, broker2);
- TestUtils.assertLeader(client, topicPartition1, broker3);
+ assertLeader(client, topicPartition0, broker2);
+ assertLeader(client, topicPartition1, broker3);
cluster.shutdownBroker(broker2);
- TestUtils.assertLeader(client, topicPartition0, broker3);
+ assertLeader(client, topicPartition0, broker3);
cluster.startBroker(broker2);
- TestUtils.waitForBrokersInIsr(client, topicPartition0,
- CollectionConverters.asScala(List.of(broker2)).toSet()
- );
- TestUtils.waitForBrokersInIsr(client, topicPartition1,
- CollectionConverters.asScala(List.of(broker2)).toSet()
- );
+ waitForBrokersInIsr(client, topicPartition0, Set.of(broker2));
+ waitForBrokersInIsr(client, topicPartition1, Set.of(broker2));
}
Path topicPartitionPath =
tempTopicPartitionFile(List.of(topicPartition0, topicPartition1));
@@ -396,4 +391,77 @@ public class LeaderElectionCommandTest {
sb.append("]}");
return sb.toString();
}
+
+ private void waitForBrokersOutOfIsr(
+ Admin client,
+ Set<TopicPartition> partitions,
+ Set<Integer> brokerIds
+ ) throws InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ Set<String> topics = partitions.stream()
+ .map(TopicPartition::topic)
+ .collect(Collectors.toSet());
+
+ Map<String, TopicDescription> description =
client.describeTopics(topics).allTopicNames().get();
+
+ Set<Integer> isr = description.entrySet().stream()
+ .flatMap(e -> e.getValue().partitions().stream()
+ .filter(info -> partitions.contains(new
TopicPartition(e.getKey(), info.partition())))
+ .flatMap(info -> info.isr().stream()))
+ .map(Node::id)
+ .collect(Collectors.toSet());
+
+ return Collections.disjoint(brokerIds, isr);
+ },
+ "Expected brokers " + brokerIds + " to no longer be in the ISR
for " + partitions
+ );
+ }
+
+ private void waitForBrokersInIsr(Admin client, TopicPartition partition,
Set<Integer> brokerIds) throws InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ Set<Integer> isr =
client.describeTopics(Set.of(partition.topic()))
+ .allTopicNames()
+ .get()
+ .get(partition.topic())
+ .partitions().stream()
+ .filter(info -> info.partition() ==
partition.partition())
+ .flatMap(info -> info.isr().stream())
+ .map(Node::id)
+ .collect(Collectors.toSet());
+
+ return isr.containsAll(brokerIds);
+ },
+ "Expected brokers " + brokerIds + " to be in the ISR for " +
partition
+ );
+ }
+
+ private void waitForOnlineBroker(Admin client, int brokerId) throws
InterruptedException {
+ TestUtils.waitForCondition(
+ () -> client.describeCluster().nodes().get().stream()
+ .anyMatch(node -> node.id() == brokerId),
+ "Timed out waiting for brokerId " + brokerId + " to come
online"
+ );
+ }
+
+ private void assertLeader(Admin client, TopicPartition topicPartition, int
expectedLeader) throws Exception {
+ int leader =
cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(client,
topicPartition.topic(), topicPartition.partition(), 30000);
+ assertEquals(expectedLeader, leader);
+ }
+
+ private void assertNoLeader(Admin client, TopicPartition topicPartition)
throws InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ TopicDescription desc =
client.describeTopics(List.of(topicPartition.topic()))
+ .allTopicNames().get().get(topicPartition.topic());
+ return desc.partitions().stream()
+ .filter(p -> p.partition() ==
topicPartition.partition())
+ .findFirst()
+ .map(p -> p.leader() == null || p.leader().id() ==
Node.noNode().id())
+ .orElse(false);
+ },
+ "Timed out waiting for no leader for " + topicPartition
+ );
+ }
}