This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c72fcda85cc MINOR: Cleanup TestUtils.scala (#22091)
c72fcda85cc is described below

commit c72fcda85cc39d33738c16311d9268af8dcacdfe
Author: Ken Huang <[email protected]>
AuthorDate: Sat May 2 10:10:22 2026 +0800

    MINOR: Cleanup TestUtils.scala (#22091)
    
    Some methods are only used in a single class; they should be moved into
    that class.  Java tests should not use TestUtils.scala
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/AuthorizerIntegrationTest.scala      |   4 +-
 .../kafka/api/BaseAdminIntegrationTest.scala       |   5 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 192 +++++++++----
 .../kafka/integration/KafkaServerTestHarness.scala |  31 ++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   6 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   6 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  44 ++-
 .../unit/kafka/network/RequestChannelTest.scala    |  46 +++-
 .../server/GroupCoordinatorBaseRequestTest.scala   |   9 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  15 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 300 +--------------------
 .../org/apache/kafka/tools/ConfigCommandTest.java  |   5 +-
 .../kafka/tools/LeaderElectionCommandTest.java     | 154 ++++++++---
 13 files changed, 406 insertions(+), 411 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 161a6bf245d..d6c94d5baa4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -4189,11 +4189,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   def removeAllClientAcls(): Unit = {
-    val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers, 
controllerServers)
+    val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
     val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, 
null, AclOperation.ANY, AclPermissionType.ANY)
     val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, 
aclEntryFilter)
 
-    authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, 
java.util.List.of(aclFilter)).asScala.
+    authorizerForWrite.deleteAcls(anonymousAuthorizableContext, 
java.util.List.of(aclFilter)).asScala.
       map(_.toCompletableFuture.get).flatMap { deletion =>
         
deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
       }.foreach { resource =>
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 16dec9dc008..5760042bbd5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -58,7 +58,10 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
   override def setUp(testInfo: TestInfo): Unit = {
     this.testInfo = testInfo
     super.setUp(testInfo)
-    waitUntilBrokerMetadataIsPropagated(brokers)
+    val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
+    waitUntilTrue(() => brokers.forall(server =>
+      
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
+    ), "Timed out waiting for broker metadata to propagate to all servers", 
15000)
   }
 
   @AfterEach
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b118a4ec02d..cadcddd99a5 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -70,7 +70,7 @@ import scala.collection.Seq
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 import scala.jdk.CollectionConverters._
-import scala.util.{Random, Using}
+import scala.util.{Failure, Random, Success, Try, Using}
 
 /**
  * An integration test of the KafkaAdminClient.
@@ -1552,7 +1552,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val result1 = client.deleteRecords(util.Map.of(topicPartition, 
RecordsToDelete.beforeOffset(117L)))
     result1.all().get()
     restartDeadBrokers()
-    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
+    waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
     waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
   }
 
@@ -1675,7 +1675,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     // delete records in corrupt segment (the first segment)
     client.deleteRecords(util.Map.of(topicPartition, 
RecordsToDelete.beforeOffset(firstSegmentRecordsSize))).all.get
     // verify reassignment is finished after delete records
-    TestUtils.waitForBrokersInIsr(client, topicPartition, 
Set(partitionLeaderId, partitionFollowerId))
+    waitForBrokersInIsr(client, topicPartition, Set(partitionLeaderId, 
partitionFollowerId))
     // seek to beginning and make sure we can consume all records
     consumer.seekToBeginning(util.List.of(topicPartition))
     assertEquals(19, TestUtils.consumeRecords(consumer, 20 - 
firstSegmentRecordsSize).last.offset())
@@ -3152,25 +3152,25 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         s"Expected preferred leader to become $preferred, but is 
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
         10000)
       // Check the leader hasn't moved
-      TestUtils.assertLeader(client, partition1, prior1)
-      TestUtils.assertLeader(client, partition2, prior2)
+      assertLeader(client, partition1, prior1)
+      assertLeader(client, partition2, prior2)
     }
 
     // Check current leaders are 0
-    TestUtils.assertLeader(client, partition1, 0)
-    TestUtils.assertLeader(client, partition2, 0)
+    assertLeader(client, partition1, 0)
+    assertLeader(client, partition2, 0)
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, 
util.Set.of(partition1))
     val exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
-    TestUtils.assertLeader(client, partition1, 0)
+    assertLeader(client, partition1, 0)
 
     // Noop election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertTrue(electResult.partitions.get.isEmpty)
-    TestUtils.assertLeader(client, partition1, 0)
-    TestUtils.assertLeader(client, partition2, 0)
+    assertLeader(client, partition1, 0)
+    assertLeader(client, partition2, 0)
 
     // Now change the preferred leader to 1
     waitForBrokerMetadataPropagation(partition1)
@@ -3182,18 +3182,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet)
     electResult.partitions.get.get(partition1)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition1"))
-    TestUtils.assertLeader(client, partition1, 1)
+    assertLeader(client, partition1, 1)
 
     // topic 2 unchanged
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    TestUtils.assertLeader(client, partition2, 0)
+    assertLeader(client, partition2, 0)
 
     // meaningful election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
     electResult.partitions.get.get(partition2)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition2"))
-    TestUtils.assertLeader(client, partition2, 1)
+    assertLeader(client, partition2, 1)
 
     def assertUnknownTopicOrPartition(
       topicPartition: TopicPartition,
@@ -3209,8 +3209,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     electResult = client.electLeaders(ElectionType.PREFERRED, 
util.Set.of(unknownPartition))
     assertEquals(util.Set.of(unknownPartition), 
electResult.partitions.get.keySet)
     assertUnknownTopicOrPartition(unknownPartition, electResult)
-    TestUtils.assertLeader(client, partition1, 1)
-    TestUtils.assertLeader(client, partition2, 1)
+    assertLeader(client, partition1, 1)
+    assertLeader(client, partition2, 1)
 
     // Now change the preferred leader to 2
     waitForBrokerMetadataPropagation(partition1)
@@ -3220,15 +3220,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     // mixed results
     electResult = client.electLeaders(ElectionType.PREFERRED, 
util.Set.of(unknownPartition, partition1))
     assertEquals(util.Set.of(unknownPartition, partition1), 
electResult.partitions.get.keySet)
-    TestUtils.assertLeader(client, partition1, 2)
-    TestUtils.assertLeader(client, partition2, 1)
+    assertLeader(client, partition1, 2)
+    assertLeader(client, partition2, 1)
     assertUnknownTopicOrPartition(unknownPartition, electResult)
 
     // elect preferred leader for partition 2
     electResult = client.electLeaders(ElectionType.PREFERRED, 
util.Set.of(partition2))
     assertEquals(util.Set.of(partition2), electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    TestUtils.assertLeader(client, partition2, 2)
+    assertLeader(client, partition2, 2)
 
     // Now change the preferred leader to 1
     waitForBrokerMetadataPropagation(partition1)
@@ -3238,7 +3238,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     killBroker(1)
     waitForBrokerMetadataPropagation(partition1)
     waitForBrokerMetadataPropagation(partition2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(1))
+    waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
 
     def assertPreferredLeaderNotAvailable(
       topicPartition: TopicPartition,
@@ -3257,17 +3257,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet)
 
     assertPreferredLeaderNotAvailable(partition1, electResult)
-    TestUtils.assertLeader(client, partition1, 2)
+    assertLeader(client, partition1, 2)
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, 
shortTimeout)
     assertTrue(Set(partition1, 
partition2).subsetOf(electResult.partitions.get.keySet.asScala))
 
     assertPreferredLeaderNotAvailable(partition1, electResult)
-    TestUtils.assertLeader(client, partition1, 2)
+    assertLeader(client, partition1, 2)
 
     assertPreferredLeaderNotAvailable(partition2, electResult)
-    TestUtils.assertLeader(client, partition2, 2)
+    assertLeader(client, partition2, 2)
   }
 
   @Test
@@ -3283,19 +3283,19 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val partition1 = new TopicPartition("unclean-test-topic-1", 0)
     createTopicWithAssignment(partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> assignment1))
 
-    TestUtils.assertLeader(client, partition1, broker1)
+    assertLeader(client, partition1, broker1)
 
     killBroker(broker2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+    waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     killBroker(broker1)
-    TestUtils.assertNoLeader(client, partition1)
+    assertNoLeader(client, partition1)
     brokers(broker2).startup()
-    TestUtils.waitForOnlineBroker(client, broker2)
+    waitForOnlineBroker(client, broker2)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(partition1))
     electResult.partitions.get.get(partition1)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition1"))
-    TestUtils.assertLeader(client, partition1, broker2)
+    assertLeader(client, partition1, broker2)
   }
 
   @Test
@@ -3318,24 +3318,24 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
-    TestUtils.assertLeader(client, partition1, broker1)
-    TestUtils.assertLeader(client, partition2, broker1)
+    assertLeader(client, partition1, broker1)
+    assertLeader(client, partition2, broker1)
 
     killBroker(broker2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(broker2))
+    waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
     killBroker(broker1)
-    TestUtils.assertNoLeader(client, partition1)
-    TestUtils.assertNoLeader(client, partition2)
+    assertNoLeader(client, partition1)
+    assertNoLeader(client, partition2)
     brokers(broker2).startup()
-    TestUtils.waitForOnlineBroker(client, broker2)
+    waitForOnlineBroker(client, broker2)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(partition1, partition2))
     electResult.partitions.get.get(partition1)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition1"))
     electResult.partitions.get.get(partition2)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition2"))
-    TestUtils.assertLeader(client, partition1, broker2)
-    TestUtils.assertLeader(client, partition2, broker2)
+    assertLeader(client, partition1, broker2)
+    assertLeader(client, partition2, broker2)
   }
 
   @Test
@@ -3359,23 +3359,23 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
-    TestUtils.assertLeader(client, partition1, broker1)
-    TestUtils.assertLeader(client, partition2, broker1)
+    assertLeader(client, partition1, broker1)
+    assertLeader(client, partition2, broker1)
 
     killBroker(broker2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+    waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     killBroker(broker1)
-    TestUtils.assertNoLeader(client, partition1)
-    TestUtils.assertLeader(client, partition2, broker3)
+    assertNoLeader(client, partition1)
+    assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
-    TestUtils.waitForOnlineBroker(client, broker2)
+    waitForOnlineBroker(client, broker2)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
     electResult.partitions.get.get(partition1)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition1"))
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    TestUtils.assertLeader(client, partition1, broker2)
-    TestUtils.assertLeader(client, partition2, broker3)
+    assertLeader(client, partition1, broker2)
+    assertLeader(client, partition2, broker3)
   }
 
   @Test
@@ -3397,7 +3397,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(0 -> assignment1)
     )
 
-    TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
+    assertLeader(client, new TopicPartition(topic, 0), broker1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(unknownPartition, unknownTopic))
     
assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@@ -3422,12 +3422,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(partition1.partition -> assignment1)
     )
 
-    TestUtils.assertLeader(client, partition1, broker1)
+    assertLeader(client, partition1, broker1)
 
     killBroker(broker2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+    waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     killBroker(broker1)
-    TestUtils.assertNoLeader(client, partition1)
+    assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(partition1))
     
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@@ -3451,10 +3451,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(partition1.partition -> assignment1)
     )
 
-    TestUtils.assertLeader(client, partition1, broker1)
+    assertLeader(client, partition1, broker1)
 
     killBroker(broker1)
-    TestUtils.assertLeader(client, partition1, broker2)
+    assertLeader(client, partition1, broker2)
     brokers(broker1).startup()
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(partition1))
@@ -3482,23 +3482,23 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       Map(partition1.partition -> assignment1, partition2.partition -> 
assignment2)
     )
 
-    TestUtils.assertLeader(client, partition1, broker1)
-    TestUtils.assertLeader(client, partition2, broker1)
+    assertLeader(client, partition1, broker1)
+    assertLeader(client, partition2, broker1)
 
     killBroker(broker2)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
+    waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
     killBroker(broker1)
-    TestUtils.assertNoLeader(client, partition1)
-    TestUtils.assertLeader(client, partition2, broker3)
+    assertNoLeader(client, partition1)
+    assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
-    TestUtils.waitForOnlineBroker(client, broker2)
+    waitForOnlineBroker(client, broker2)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
util.Set.of(partition1, partition2))
     electResult.partitions.get.get(partition1)
       .ifPresent(t => fail(s"Unexpected exception during leader election: $t 
for partition $partition1"))
     
assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
-    TestUtils.assertLeader(client, partition1, broker2)
-    TestUtils.assertLeader(client, partition2, broker3)
+    assertLeader(client, partition1, broker2)
+    assertLeader(client, partition2, broker3)
   }
 
   @Test
@@ -5061,4 +5061,82 @@ object PlaintextAdminIntegrationTest {
 
     assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, 
configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
   }
+
+  private def waitForBrokersInIsr(client: Admin, partition: TopicPartition, 
brokerIds: Set[Int]): Unit = {
+    waitUntilTrue(
+      () => {
+        val isr = client.describeTopics(util.Set.of(partition.topic))
+          .allTopicNames
+          .get
+          .get(partition.topic)
+          .partitions.asScala
+          .filter(_.partition == partition.partition)
+          .flatMap(_.isr.asScala)
+          .map(_.id)
+          .toSet
+        brokerIds.subsetOf(isr)
+      },
+      s"Expected brokers $brokerIds to be in the ISR for $partition"
+    )
+  }
+
+  private def waitForBrokersOutOfIsr(client: Admin, partition: 
Set[TopicPartition], brokerIds: Set[Int]): Unit = {
+    waitUntilTrue(
+      () => {
+        val description = 
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
+        val isr = description
+          .flatMap { case (topic, desc) =>
+            desc.partitions.asScala
+              .filter(info => partition.contains(new TopicPartition(topic, 
info.partition)))
+              .flatMap(_.isr.asScala)
+          }
+          .map(_.id)
+          .toSet
+
+        brokerIds.intersect(isr).isEmpty
+      },
+      s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
+    )
+  }
+
+  private def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  private def assertLeader(client: Admin, topicPartition: TopicPartition, 
expectedLeader: Int): Unit = {
+    waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
+  }
+
+  private def assertNoLeader(client: Admin, topicPartition: TopicPartition): 
Unit = {
+    waitForLeaderToBecome(client, topicPartition, None)
+  }
+
+  private def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
+    val topic = topicPartition.topic
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = 
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))
+        .map(_.id)
+    }
+
+    val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
+      case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
+      case Failure(e: ExecutionException) if 
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+      case Failure(e) => throw e
+    }
+
+    assertTrue(isLeaderElected, s"Timed out waiting for leader to become 
$expectedLeaderOpt. " +
+      s"Last metadata lookup returned leader = 
${lastLeaderCheck.getOrElse("unknown")}")
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 7f37eeb25a1..4b6690a4fdc 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 import java.io.File
+import java.net.InetAddress
 import java.time.Duration
 import java.util
 import java.util.Properties
@@ -214,6 +216,17 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     }
   }
 
+  val anonymousAuthorizableContext = new AuthorizableRequestContext() {
+    override def listenerName(): String = ""
+    override def securityProtocol(): SecurityProtocol = 
SecurityProtocol.PLAINTEXT
+    override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
+    override def clientAddress(): InetAddress = null
+    override def requestType(): Int = 0
+    override def requestVersion(): Int = 0
+    override def clientId(): String = ""
+    override def correlationId(): Int = 0
+  }
+
   def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: 
ResourcePattern): Unit = {
     val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
     val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
@@ -371,4 +384,20 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
       }
     }
   }
+
+  /**
+   * Find an Authorizer that we can call createAcls or deleteAcls on.
+   */
+  def pickAuthorizerForWrite[B <: KafkaBroker](brokers: Seq[B], controllers: 
Seq[ControllerServer]): JAuthorizer = {
+    if (controllers.isEmpty) {
+      brokers.head.authorizerPlugin.get.get
+    } else {
+      var result: JAuthorizer = null
+      TestUtils.retry(120000) {
+        val active = controllers.filter(_.controller.isActive).head
+        result = active.authorizerPlugin.get.get
+      }
+      result
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c51cba6a66f..e1aa985e01c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1229,7 +1229,7 @@ class LogCleanerTest extends Logging {
 
     // the last (active) segment has just one message
 
-    def distinctValuesBySegment = log.logSegments.asScala.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
+    def distinctValuesBySegment = log.logSegments.asScala.map(s => 
s.log.records.asScala.map(record => 
Utils.utf8(record.value())).toSet.size).toSeq
 
     val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
@@ -1927,7 +1927,7 @@ class LogCleanerTest extends Logging {
 
     for (segment <- log.logSegments.asScala; batch <- 
segment.log.batches.asScala; record <- batch.asScala) {
       assertTrue(record.hasMagic(batch.magic))
-      val value = TestUtils.readString(record.value).toLong
+      val value = Utils.utf8(record.value()).toLong
       assertEquals(record.offset, value)
     }
   }
@@ -1947,7 +1947,7 @@ class LogCleanerTest extends Logging {
 
     for (logEntry <- records.records.asScala) {
       val offset = logEntry.offset
-      val value = TestUtils.readString(logEntry.value).toLong
+      val value = Utils.utf8(logEntry.value()).toLong
       assertEquals(offset, value)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 9875a3b2ce4..e5d34608a04 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import java.io.File
 import java.util.Properties
-import kafka.utils.TestUtils
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.record.internal.{ControlRecordType, 
EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
@@ -38,7 +37,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.Scheduler
 import 
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
 DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
 import org.apache.kafka.common.message.AbortedTxn
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogOffsetsListener, LogSegment, ProducerStateManager, 
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogOffsetsListener, LogSegment, ProducerStateManager, 
ProducerStateManagerConfig, TransactionIndex, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import scala.jdk.CollectionConverters._
@@ -202,7 +201,7 @@ object LogTestUtils {
     for (logSegment <- log.logSegments.asScala;
          batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
          record <- batch.asScala if record.hasValue && record.hasKey)
-      yield TestUtils.readString(record.key).toLong
+      yield Utils.utf8(record.key()).toLong
   }
 
   def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: 
Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: 
Scheduler): UnifiedLog = {
@@ -308,4 +307,5 @@ object LogTestUtils {
       sequence += numRecords
     }
   }
+
 }
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 770ec432118..e30531779c2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -36,11 +36,14 @@ import org.apache.kafka.common.metrics.JmxReporter
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
+import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
 import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
+import java.io.File
 import scala.jdk.OptionConverters.RichOptional
 
 @Timeout(120)
@@ -64,7 +67,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging 
{
     val topic = "test-topic-metric"
     createTopic(topic)
     deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(topic, 1, brokers)
+    verifyTopicDeletion(topic, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists 
after deleteTopic")
   }
 
@@ -78,7 +81,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging 
{
     assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
     brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
     deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(topic, 1, brokers)
+    verifyTopicDeletion(topic, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists 
after deleteTopic")
   }
 
@@ -244,4 +247,41 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => 
s"($t)$$").getOrElse("")).r.pattern
     metrics.filter(pattern.matcher(_).matches())
   }
+
+  private def verifyTopicDeletion[B <: KafkaBroker](topic: String, brokers: 
Seq[B]): Unit = {
+    val topicPartitions = (0 until 1).map(new TopicPartition(topic, _))
+    // ensure that the topic-partition has been deleted from all brokers' 
replica managers
+    TestUtils.waitUntilTrue(() =>
+      brokers.forall(broker => topicPartitions.forall(tp => 
broker.replicaManager.onlinePartition(tp).isEmpty)),
+      "Replica manager's should have deleted all of this topic's partitions")
+    // ensure that logs from all replicas are deleted
+    TestUtils.waitUntilTrue(() => brokers.forall(broker => 
topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)),
+      "Replica logs not deleted after delete topic is complete")
+    // ensure that topic is removed from all cleaner offsets
+    TestUtils.waitUntilTrue(() => brokers.forall(broker => 
topicPartitions.forall { tp =>
+      val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
+        new OffsetCheckpointFile(new File(logDir, 
"cleaner-offset-checkpoint"), null).read()
+      }
+      checkpoints.forall(checkpointsPerLogDir => 
!checkpointsPerLogDir.containsKey(tp))
+    }), "Cleaner offset for deleted partition should have been removed")
+    TestUtils.waitUntilTrue(() => brokers.forall(broker =>
+      broker.config.logDirs.stream().allMatch { logDir =>
+        topicPartitions.forall { tp =>
+          !new File(logDir, tp.topic + "-" + tp.partition).exists()
+        }
+      }
+    ), "Failed to soft-delete the data to a delete directory")
+    TestUtils.waitUntilTrue(() => brokers.forall(broker =>
+      broker.config.logDirs.stream().allMatch { logDir =>
+        topicPartitions.forall { tp =>
+          !util.List.of(new File(logDir).list()).asScala.exists { 
partitionDirectoryNames =>
+            partitionDirectoryNames.exists { directoryName =>
+              directoryName.startsWith(tp.topic + "-" + tp.partition) &&
+                directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
+            }
+          }
+        }
+      }
+    ), "Failed to hard-delete the delete directory")
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index cb66e18afff..800e6c2039d 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -20,7 +20,6 @@ package kafka.network
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.network
 import kafka.server.EnvelopeUtils
-import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
SslConfigs, TopicConfig}
@@ -29,7 +28,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
 import org.apache.kafka.common.message.{CreateTopicsRequestData, 
CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.AlterConfigsRequest._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
@@ -48,6 +47,7 @@ import java.io.IOException
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util
+import java.util.Optional
 import java.util.concurrent.atomic.AtomicReference
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
@@ -260,7 +260,7 @@ class RequestChannelTest {
   }
 
   private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): 
RequestChannel.Request = {
-    val wrappedRequest = TestUtils.buildEnvelopeRequest(
+    val wrappedRequest = buildEnvelopeRequest(
       request,
       principalSerde,
       requestChannelMetrics,
@@ -278,6 +278,46 @@ class RequestChannelTest {
     unwrappedRequest.get()
   }
 
+  def buildEnvelopeRequest(
+    request: AbstractRequest,
+    principalSerde: KafkaPrincipalSerde,
+    requestChannelMetrics: RequestChannelMetrics,
+    startTimeNanos: Long,
+    dequeueTimeNanos: Long = -1,
+    fromPrivilegedListener: Boolean = true
+  ): RequestChannel.Request = {
+    val clientId = "id"
+    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+    val requestHeader = new RequestHeader(request.apiKey, request.version, 
clientId, 0)
+    val requestBuffer = request.serializeWithHeader(requestHeader)
+
+    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+    val envelopeBuffer = new EnvelopeRequest.Builder(
+      requestBuffer,
+      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+      InetAddress.getLocalHost.getAddress
+    ).build().serializeWithHeader(envelopeHeader)
+
+    RequestHeader.parse(envelopeBuffer)
+
+    val envelopeContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost, Optional.empty(),
+      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
+      fromPrivilegedListener, Optional.of(principalSerde))
+
+    val envelopRequest = new RequestChannel.Request(
+      processor = 1,
+      context = envelopeContext,
+      startTimeNanos = startTimeNanos,
+      memoryPool = MemoryPool.NONE,
+      buffer = envelopeBuffer,
+      metrics = requestChannelMetrics,
+      envelope = None
+    )
+    envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
+    envelopRequest
+  }
+
   private def isValidJson(str: String): Boolean = {
     try {
       val mapper = new ObjectMapper
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 50bad2dfcd3..cd9afcc889b 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.IntegrationTestUtils
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
 
@@ -70,10 +71,14 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
   protected def createTransactionStateTopic(): Unit = {
     val admin = cluster.admin()
     try {
-      TestUtils.createTransactionStateTopicWithAdmin(
+      TestUtils.createTopicWithAdmin(
         admin = admin,
+        topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
+        numPartitions = 
brokers().head.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
+        replicationFactor = 
brokers().head.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
         brokers = brokers(),
-        controllers = controllerServers()
+        controllers = controllerServers(),
+        topicConfig = new Properties()
       )
     } finally {
       admin.close()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d3009e989dc..be14bcb3314 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import com.yammer.metrics.core.{Histogram, Meter}
 import kafka.cluster.Partition
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
@@ -96,7 +97,7 @@ import org.apache.kafka.server.authorizer.{Action, 
AuthorizationResult, Authoriz
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, 
StreamsVersion, TransactionVersion}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.logger.LoggingController
-import org.apache.kafka.server.metrics.ClientMetricsTestUtils
+import org.apache.kafka.server.metrics.{ClientMetricsTestUtils, 
KafkaYammerMetrics}
 import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, SharePartitionKey}
 import org.apache.kafka.server.quota.{ClientQuotaManager, 
ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota, 
ReplicationQuotaManager, ThrottleCallback}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2262,12 +2263,22 @@ class KafkaApisTest extends Logging {
       assertEquals(expectedError, error)
 
       val metricName = if (version < 4) ApiKeys.ADD_PARTITIONS_TO_TXN.name 
else RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME
-      assertEquals(8, TestUtils.metersCount(metricName))
+      assertEquals(8, metersCount(metricName))
     } finally {
       requestMetrics.close()
     }
   }
 
+  private def metersCount(metricName: String): Long = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+      .values.map {
+        case histogram: Histogram => histogram.count()
+        case meter: Meter => meter.count()
+        case _ => 0
+      }.sum
+  }
+
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
   def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fc1f10fbbd7..b55661b95ef 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -16,8 +16,7 @@
  */
 package kafka.utils
 
-import com.yammer.metrics.core.{Histogram, Meter}
-import kafka.network.RequestChannel
+import com.yammer.metrics.core.Meter
 import kafka.security.JaasTestUtils
 import kafka.server._
 import kafka.utils.Implicits._
@@ -32,15 +31,12 @@ import org.apache.kafka.common.config.{ConfigException, 
ConfigResource}
 import org.apache.kafka.common.errors.{TopicExistsException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.{Plugin, Topic}
-import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, 
ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.record.internal._
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.utils.Utils.formatAddress
@@ -48,14 +44,12 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr, 
MockConfigRepository}
 import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
-import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
+import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, 
LogConfig, LogDirFailureChannel, LogManager, ProducerStateManagerConfig, 
UnifiedLog}
+import org.apache.kafka.storage.internals.log._
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
@@ -63,9 +57,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean}
 import org.mockito.Mockito
 
 import java.io._
-import java.net.InetAddress
 import java.nio._
-import java.nio.charset.{Charset, StandardCharsets}
+import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, StandardOpenOption}
 import java.time.Duration
 import java.util
@@ -78,7 +71,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
 import scala.jdk.javaapi.OptionConverters
-import scala.util.{Failure, Success, Try}
 
 /**
  * Utility functions to help with testing
@@ -99,7 +91,6 @@ object TestUtils extends Logging {
 
   private val transactionStatusKey = "transactionStatus"
   private val committedValue : Array[Byte] = 
"committed".getBytes(StandardCharsets.UTF_8)
-  private val abortedValue : Array[Byte] = 
"aborted".getBytes(StandardCharsets.UTF_8)
 
   sealed trait LogDirFailureType
   case object Roll extends LogDirFailureType
@@ -131,11 +122,6 @@ object TestUtils extends Logging {
    */
   def tempFile(prefix: String, suffix: String): File = 
JTestUtils.tempFile(prefix, suffix)
 
-  def tempPropertiesFile(properties: Map[String, String]): File = {
-    val content = properties.map{case (k, v) => k + "=" + 
v}.mkString(System.lineSeparator())
-    JTestUtils.tempFile(content)
-  }
-
   /**
    * Create a test config for the provided parameters.
    *
@@ -440,23 +426,6 @@ object TestUtils extends Logging {
     )
   }
 
-  def createTransactionStateTopicWithAdmin[B <: KafkaBroker](
-    admin: Admin,
-    brokers: Seq[B],
-    controllers: Seq[ControllerServer]
-  ): Map[Int, Int] = {
-    val broker = brokers.head
-    createTopicWithAdmin(
-      admin = admin,
-      topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
-      numPartitions = 
broker.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
-      replicationFactor = 
broker.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
-      brokers = brokers,
-      controllers = controllers,
-      topicConfig = new Properties(),
-    )
-  }
-
   def deleteTopicWithAdmin[B <: KafkaBroker](
     admin: Admin,
     topic: String,
@@ -576,7 +545,7 @@ object TestUtils extends Logging {
    *
    * @return The new leader (note that negative values are used to indicate 
conditions like NoLeader and
    *         LeaderDuringDelete).
-   * @throws AssertionError if the expected condition is not true within the 
timeout.
+   * @throws java.lang.AssertionError if the expected condition is not true 
within the timeout.
    */
   def waitUntilLeaderIsElectedOrChangedWithAdmin(
     admin: Admin,
@@ -597,17 +566,6 @@ object TestUtils extends Logging {
           }
         }
     }
-    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, 
timeoutMs, oldLeaderOpt, newLeaderOpt)
-  }
-
-  private def doWaitUntilLeaderIsElectedOrChanged(
-    getPartitionLeader: (String, Int) => Option[Int],
-    topic: String,
-    partition: Int,
-    timeoutMs: Long,
-    oldLeaderOpt: Option[Int],
-    newLeaderOpt: Option[Int]
-  ): Int = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define 
both the old and the new leader")
     val startTime = System.currentTimeMillis()
     val topicPartition = new TopicPartition(topic, partition)
@@ -804,21 +762,6 @@ object TestUtils extends Logging {
       .getOrElse(throw new AssertionError(s"Unable to locate follower for 
$topicPartition"))
   }
 
-   /**
-    * Wait until all brokers know about each other.
-    *
-    * @param brokers The Kafka brokers.
-    * @param timeout The amount of time waiting on this condition before 
assert to fail
-    */
-  def waitUntilBrokerMetadataIsPropagated[B <: KafkaBroker](
-      brokers: Seq[B],
-      timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
-    val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
-    waitUntilTrue(() => brokers.forall(server =>
-      
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
-    ), "Timed out waiting for broker metadata to propagate to all servers", 
timeout)
-  }
-
   /**
    * Wait until the expected number of partitions is in the metadata cache in 
each broker.
    *
@@ -1041,58 +984,6 @@ object TestUtils extends Logging {
     }
   }
 
-  def verifyTopicDeletion[B <: KafkaBroker](
-      topic: String,
-      numPartitions: Int,
-      brokers: Seq[B]): Unit = {
-    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _))
-    // ensure that the topic-partition has been deleted from all brokers' 
replica managers
-    waitUntilTrue(() =>
-      brokers.forall(broker => topicPartitions.forall(tp => 
broker.replicaManager.onlinePartition(tp).isEmpty)),
-      "Replica manager's should have deleted all of this topic's partitions")
-    // ensure that logs from all replicas are deleted
-    waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => 
broker.logManager.getLog(tp).isEmpty)),
-      "Replica logs not deleted after delete topic is complete")
-    // ensure that topic is removed from all cleaner offsets
-    waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp =>
-      val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
-        new OffsetCheckpointFile(new File(logDir, 
"cleaner-offset-checkpoint"), null).read()
-      }
-      checkpoints.forall(checkpointsPerLogDir => 
!checkpointsPerLogDir.containsKey(tp))
-    }), "Cleaner offset for deleted partition should have been removed")
-    waitUntilTrue(() => brokers.forall(broker =>
-      broker.config.logDirs.stream().allMatch { logDir =>
-        topicPartitions.forall { tp =>
-          !new File(logDir, tp.topic + "-" + tp.partition).exists()
-        }
-      }
-    ), "Failed to soft-delete the data to a delete directory")
-    waitUntilTrue(() => brokers.forall(broker =>
-      broker.config.logDirs.stream().allMatch { logDir =>
-        topicPartitions.forall { tp =>
-          !util.List.of(new File(logDir).list()).asScala.exists { 
partitionDirectoryNames =>
-            partitionDirectoryNames.exists { directoryName =>
-              directoryName.startsWith(tp.topic + "-" + tp.partition) &&
-                directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
-            }
-          }
-        }
-      }
-    ), "Failed to hard-delete the delete directory")
-  }
-
-  /**
-   * Translate the given buffer into a string
-   *
-   * @param buffer The buffer to translate
-   * @param encoding The encoding to use in translating bytes to characters
-   */
-  def readString(buffer: ByteBuffer, encoding: String = 
Charset.defaultCharset.toString): String = {
-    val bytes = new Array[Byte](buffer.remaining)
-    buffer.get(bytes)
-    new String(bytes, encoding)
-  }
-
   def waitAndVerifyAcls(expected: Set[AccessControlEntry],
                         authorizerPlugin: Plugin[JAuthorizer],
                         resource: ResourcePattern,
@@ -1112,7 +1003,7 @@ object TestUtils extends Logging {
       45000)
   }
 
-  def consumeTopicRecords[K, V, B <: KafkaBroker](
+  def consumeTopicRecords[B <: KafkaBroker](
       brokers: Seq[B],
       topic: String,
       numMessages: Int,
@@ -1223,7 +1114,7 @@ object TestUtils extends Logging {
       override def value() = if (willBeCommitted)
         committedValue
       else
-        abortedValue
+        "aborted".getBytes(StandardCharsets.UTF_8)
     }
     new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value, 
util.Set.of(header))
   }
@@ -1270,86 +1161,6 @@ object TestUtils extends Logging {
     adminClient.incrementalAlterConfigs(configs)
   }
 
-  def assertLeader(client: Admin, topicPartition: TopicPartition, 
expectedLeader: Int): Unit = {
-    waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
-  }
-
-  def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = {
-    waitForLeaderToBecome(client, topicPartition, None)
-  }
-
-  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
-    waitUntilTrue(() => {
-      val nodes = client.describeCluster().nodes().get()
-      nodes.asScala.exists(_.id == brokerId)
-    }, s"Timed out waiting for brokerId $brokerId to come online")
-  }
-
-  def waitForLeaderToBecome(
-    client: Admin,
-    topicPartition: TopicPartition,
-    expectedLeaderOpt: Option[Int]
-  ): Unit = {
-    val topic = topicPartition.topic
-    val partitionId = topicPartition.partition
-
-    def currentLeader: Try[Option[Int]] = Try {
-      val topicDescription = 
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
-      topicDescription.partitions.asScala
-        .find(_.partition == partitionId)
-        .flatMap(partitionState => Option(partitionState.leader))
-        .map(_.id)
-    }
-
-    val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
-      case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
-      case Failure(e: ExecutionException) if 
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
-      case Failure(e) => throw e
-    }
-
-    assertTrue(isLeaderElected, s"Timed out waiting for leader to become 
$expectedLeaderOpt. " +
-      s"Last metadata lookup returned leader = 
${lastLeaderCheck.getOrElse("unknown")}")
-  }
-
-  def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], 
brokerIds: Set[Int]): Unit = {
-    waitUntilTrue(
-      () => {
-        val description = 
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
-        val isr = description
-          .values
-          .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-          .map(_.id)
-          .toSet
-
-        brokerIds.intersect(isr).isEmpty
-      },
-      s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
-    )
-  }
-
-  def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
-    val description = admin.describeTopics(util.Set.of(partition.topic))
-      .allTopicNames
-      .get
-      .asScala
-
-    description
-      .values
-      .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-      .map(_.id)
-      .toSet
-  }
-
-  def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: 
Set[Int]): Unit = {
-    waitUntilTrue(
-      () => {
-        val isr = currentIsr(client, partition)
-        brokerIds.subsetOf(isr)
-      },
-      s"Expected brokers $brokerIds to be in the ISR for $partition"
-    )
-  }
-
   def assertBadConfigContainingMessage(props: Properties, 
expectedExceptionContainsText: String): Unit = {
     try {
       KafkaConfig.fromProps(props)
@@ -1364,105 +1175,18 @@ object TestUtils extends Logging {
   }
 
   def totalMetricValue(broker: KafkaBroker, metricName: String): Long = {
-    totalMetricValue(broker.metrics, metricName)
-  }
-
-  def totalMetricValue(metrics: Metrics, metricName: String): Long = {
-    val allMetrics = metrics.metrics
+    val allMetrics = broker.metrics.metrics
     val total = allMetrics.values().asScala.filter(_.metricName().name() == 
metricName)
       .foldLeft(0.0)((total, metric) => total + 
metric.metricValue.asInstanceOf[Double])
     total.toLong
   }
 
   def meterCount(metricName: String): Long = {
-    meterCountOpt(metricName).getOrElse(fail(s"Unable to find metric 
$metricName"))
-  }
-
-  def meterCountOpt(metricName: String): Option[Long] = {
     KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
       .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
       .values
       .headOption
       .map(_.asInstanceOf[Meter].count)
-  }
-
-  def metersCount(metricName: String): Long = {
-    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
-      .values.map {
-        case histogram: Histogram => histogram.count()
-        case meter: Meter => meter.count()
-        case _ => 0
-      }.sum
-  }
-
-  /**
-   * Find an Authorizer that we can call createAcls or deleteAcls on.
-   */
-  def pickAuthorizerForWrite[B <: KafkaBroker](
-    brokers: Seq[B],
-    controllers: Seq[ControllerServer],
-  ): JAuthorizer = {
-    if (controllers.isEmpty) {
-      brokers.head.authorizerPlugin.get.get
-    } else {
-      var result: JAuthorizer = null
-      TestUtils.retry(120000) {
-        val active = controllers.filter(_.controller.isActive).head
-        result = active.authorizerPlugin.get.get
-      }
-      result
-    }
-  }
-
-  val anonymousAuthorizableContext = new AuthorizableRequestContext() {
-    override def listenerName(): String = ""
-    override def securityProtocol(): SecurityProtocol = 
SecurityProtocol.PLAINTEXT
-    override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
-    override def clientAddress(): InetAddress = null
-    override def requestType(): Int = 0
-    override def requestVersion(): Int = 0
-    override def clientId(): String = ""
-    override def correlationId(): Int = 0
-  }
-
-  def buildEnvelopeRequest(
-    request: AbstractRequest,
-    principalSerde: KafkaPrincipalSerde,
-    requestChannelMetrics: RequestChannelMetrics,
-    startTimeNanos: Long,
-    dequeueTimeNanos: Long = -1,
-    fromPrivilegedListener: Boolean = true
-  ): RequestChannel.Request = {
-    val clientId = "id"
-    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-
-    val requestHeader = new RequestHeader(request.apiKey, request.version, 
clientId, 0)
-    val requestBuffer = request.serializeWithHeader(requestHeader)
-
-    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
-    val envelopeBuffer = new EnvelopeRequest.Builder(
-      requestBuffer,
-      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
-      InetAddress.getLocalHost.getAddress
-    ).build().serializeWithHeader(envelopeHeader)
-
-    RequestHeader.parse(envelopeBuffer)
-
-    val envelopeContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost, Optional.empty(),
-      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
-      fromPrivilegedListener, Optional.of(principalSerde))
-
-    val envelopRequest = new RequestChannel.Request(
-      processor = 1,
-      context = envelopeContext,
-      startTimeNanos = startTimeNanos,
-      memoryPool = MemoryPool.NONE,
-      buffer = envelopeBuffer,
-      metrics = requestChannelMetrics,
-      envelope = None
-    )
-    envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
-    envelopRequest
+      .getOrElse(fail(s"Unable to find metric $metricName"))
   }
 }
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index e815514aac3..29092e0c304 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -72,8 +72,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.jdk.javaapi.CollectionConverters;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -791,7 +789,7 @@ public class ConfigCommandTest {
         addedConfigs.put("delete.retention.ms", "1000000");
         addedConfigs.put("min.insync.replicas", "2");
         if (file) {
-            File f = 
kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs));
+            File f = ToolsTestUtils.tempPropertiesFile(addedConfigs);
             filePath = f.getPath();
         }
 
@@ -1439,7 +1437,6 @@ public class ConfigCommandTest {
         return res;
     }
 
-
     static class DummyAdminClient extends MockAdminClient {
         public DummyAdminClient(Node node) {
             super(List.of(node), node);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index 2f5f92a6ee7..d3d646cae7e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.tools;
 
-import kafka.utils.TestUtils;
-
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.test.ClusterInstance;
@@ -29,6 +29,7 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.utils.internals.Exit;
+import org.apache.kafka.test.TestUtils;
 
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
@@ -38,6 +39,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,8 +48,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-
-import scala.jdk.javaapi.CollectionConverters;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -71,7 +72,7 @@ public class LeaderElectionCommandTest {
     }
 
     @ClusterTest
-    public void testAllTopicPartition() throws InterruptedException, 
ExecutionException {
+    public void testAllTopicPartition() throws Exception {
         String topic = "unclean-topic";
         int partition = 0;
         List<Integer> assignment = List.of(broker2, broker3);
@@ -83,16 +84,16 @@ public class LeaderElectionCommandTest {
 
             TopicPartition topicPartition = new TopicPartition(topic, 
partition);
 
-            TestUtils.assertLeader(client, topicPartition, broker2);
+            assertLeader(client, topicPartition, broker2);
             cluster.shutdownBroker(broker3);
-            TestUtils.waitForBrokersOutOfIsr(client,
-                    
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
-                    CollectionConverters.asScala(List.of(broker3)).toSet()
+            waitForBrokersOutOfIsr(client,
+                    Set.of(topicPartition),
+                    Set.of(broker3)
             );
             cluster.shutdownBroker(broker2);
-            TestUtils.assertNoLeader(client, topicPartition);
+            assertNoLeader(client, topicPartition);
             cluster.startBroker(broker3);
-            TestUtils.waitForOnlineBroker(client, broker3);
+            waitForOnlineBroker(client, broker3);
 
             assertEquals(0, LeaderElectionCommand.mainNoExit(
                     "--bootstrap-server", cluster.bootstrapServers(),
@@ -100,7 +101,7 @@ public class LeaderElectionCommandTest {
                     "--all-topic-partitions"
             ));
 
-            TestUtils.assertLeader(client, topicPartition, broker3);
+            assertLeader(client, topicPartition, broker3);
         }
     }
 
@@ -178,7 +179,7 @@ public class LeaderElectionCommandTest {
     }
 
     @ClusterTest
-    public void testTopicPartition() throws InterruptedException, 
ExecutionException {
+    public void testTopicPartition() throws Exception {
         String topic = "unclean-topic";
         int partition = 0;
         List<Integer> assignment = List.of(broker2, broker3);
@@ -189,17 +190,17 @@ public class LeaderElectionCommandTest {
 
             TopicPartition topicPartition = new TopicPartition(topic, 
partition);
 
-            TestUtils.assertLeader(client, topicPartition, broker2);
+            assertLeader(client, topicPartition, broker2);
 
             cluster.shutdownBroker(broker3);
-            TestUtils.waitForBrokersOutOfIsr(client,
-                    
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
-                    CollectionConverters.asScala(List.of(broker3)).toSet()
+            waitForBrokersOutOfIsr(client,
+                    Set.of(topicPartition),
+                    Set.of(broker3)
             );
             cluster.shutdownBroker(broker2);
-            TestUtils.assertNoLeader(client, topicPartition);
+            assertNoLeader(client, topicPartition);
             cluster.startBroker(broker3);
-            TestUtils.waitForOnlineBroker(client, broker3);
+            waitForOnlineBroker(client, broker3);
 
             assertEquals(0, LeaderElectionCommand.mainNoExit(
                     "--bootstrap-server", cluster.bootstrapServers(),
@@ -208,7 +209,7 @@ public class LeaderElectionCommandTest {
                     "--partition", Integer.toString(partition)
             ));
 
-            TestUtils.assertLeader(client, topicPartition, broker3);
+            assertLeader(client, topicPartition, broker3);
         }
     }
 
@@ -227,17 +228,17 @@ public class LeaderElectionCommandTest {
 
             TopicPartition topicPartition = new TopicPartition(topic, 
partition);
 
-            TestUtils.assertLeader(client, topicPartition, broker2);
+            assertLeader(client, topicPartition, broker2);
 
             cluster.shutdownBroker(broker3);
-            TestUtils.waitForBrokersOutOfIsr(client,
-                    
CollectionConverters.asScala(List.of(topicPartition)).toSet(),
-                    CollectionConverters.asScala(List.of(broker3)).toSet()
+            waitForBrokersOutOfIsr(client,
+                    Set.of(topicPartition),
+                    Set.of(broker3)
             );
             cluster.shutdownBroker(broker2);
-            TestUtils.assertNoLeader(client, topicPartition);
+            assertNoLeader(client, topicPartition);
             cluster.startBroker(broker3);
-            TestUtils.waitForOnlineBroker(client, broker3);
+            waitForOnlineBroker(client, broker3);
 
             Path topicPartitionPath = 
tempTopicPartitionFile(List.of(topicPartition));
 
@@ -247,12 +248,12 @@ public class LeaderElectionCommandTest {
                     "--path-to-json-file", topicPartitionPath.toString()
             ));
 
-            TestUtils.assertLeader(client, topicPartition, broker3);
+            assertLeader(client, topicPartition, broker3);
         }
     }
 
     @ClusterTest
-    public void testPreferredReplicaElection() throws InterruptedException, 
ExecutionException {
+    public void testPreferredReplicaElection() throws Exception {
         String topic = "preferred-topic";
         int partition = 0;
         List<Integer> assignment = List.of(broker2, broker3);
@@ -266,14 +267,12 @@ public class LeaderElectionCommandTest {
 
             TopicPartition topicPartition = new TopicPartition(topic, 
partition);
 
-            TestUtils.assertLeader(client, topicPartition, broker2);
+            assertLeader(client, topicPartition, broker2);
 
             cluster.shutdownBroker(broker2);
-            TestUtils.assertLeader(client, topicPartition, broker3);
+            assertLeader(client, topicPartition, broker3);
             cluster.startBroker(broker2);
-            TestUtils.waitForBrokersInIsr(client, topicPartition,
-                    CollectionConverters.asScala(List.of(broker2)).toSet()
-            );
+            waitForBrokersInIsr(client, topicPartition, Set.of(broker2));
 
             assertEquals(0, LeaderElectionCommand.mainNoExit(
                     "--bootstrap-server", cluster.bootstrapServers(),
@@ -282,7 +281,7 @@ public class LeaderElectionCommandTest {
                     "--partition", Integer.toString(partition)
             ));
 
-            TestUtils.assertLeader(client, topicPartition, broker2);
+            assertLeader(client, topicPartition, broker2);
         }
     }
 
@@ -319,18 +318,14 @@ public class LeaderElectionCommandTest {
             topicPartition0 = new TopicPartition(topic, partition0);
             topicPartition1 = new TopicPartition(topic, partition1);
 
-            TestUtils.assertLeader(client, topicPartition0, broker2);
-            TestUtils.assertLeader(client, topicPartition1, broker3);
+            assertLeader(client, topicPartition0, broker2);
+            assertLeader(client, topicPartition1, broker3);
 
             cluster.shutdownBroker(broker2);
-            TestUtils.assertLeader(client, topicPartition0, broker3);
+            assertLeader(client, topicPartition0, broker3);
             cluster.startBroker(broker2);
-            TestUtils.waitForBrokersInIsr(client, topicPartition0,
-                    CollectionConverters.asScala(List.of(broker2)).toSet()
-            );
-            TestUtils.waitForBrokersInIsr(client, topicPartition1,
-                    CollectionConverters.asScala(List.of(broker2)).toSet()
-            );
+            waitForBrokersInIsr(client, topicPartition0, Set.of(broker2));
+            waitForBrokersInIsr(client, topicPartition1, Set.of(broker2));
         }
 
         Path topicPartitionPath = 
tempTopicPartitionFile(List.of(topicPartition0, topicPartition1));
@@ -396,4 +391,77 @@ public class LeaderElectionCommandTest {
         sb.append("]}");
         return sb.toString();
     }
+
+    private void waitForBrokersOutOfIsr(
+            Admin client, 
+            Set<TopicPartition> partitions, 
+            Set<Integer> brokerIds
+    ) throws InterruptedException {
+        TestUtils.waitForCondition(
+                () -> {
+                    Set<String> topics = partitions.stream()
+                            .map(TopicPartition::topic)
+                            .collect(Collectors.toSet());
+
+                    Map<String, TopicDescription> description = 
client.describeTopics(topics).allTopicNames().get();
+
+                    Set<Integer> isr = description.entrySet().stream()
+                            .flatMap(e -> e.getValue().partitions().stream()
+                                    .filter(info -> partitions.contains(new 
TopicPartition(e.getKey(), info.partition())))
+                                    .flatMap(info -> info.isr().stream()))
+                            .map(Node::id)
+                            .collect(Collectors.toSet());
+
+                    return Collections.disjoint(brokerIds, isr);
+                },
+                "Expected brokers " + brokerIds + " to no longer be in the ISR 
for " + partitions
+        );
+    }
+
+    private void waitForBrokersInIsr(Admin client, TopicPartition partition, 
Set<Integer> brokerIds) throws InterruptedException {
+        TestUtils.waitForCondition(
+                () -> {
+                    Set<Integer> isr = 
client.describeTopics(Set.of(partition.topic()))
+                            .allTopicNames()
+                            .get()
+                            .get(partition.topic())
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
partition.partition())
+                            .flatMap(info -> info.isr().stream())
+                            .map(Node::id)
+                            .collect(Collectors.toSet());
+
+                    return isr.containsAll(brokerIds);
+                },
+                "Expected brokers " + brokerIds + " to be in the ISR for " + 
partition
+        );
+    }
+
+    private void waitForOnlineBroker(Admin client, int brokerId) throws 
InterruptedException {
+        TestUtils.waitForCondition(
+                () -> client.describeCluster().nodes().get().stream()
+                        .anyMatch(node -> node.id() == brokerId),
+                "Timed out waiting for brokerId " + brokerId + " to come 
online"
+        );
+    }
+
+    private void assertLeader(Admin client, TopicPartition topicPartition, int 
expectedLeader) throws Exception {
+        int leader = 
cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(client, 
topicPartition.topic(), topicPartition.partition(), 30000);
+        assertEquals(expectedLeader, leader);
+    }
+
+    private void assertNoLeader(Admin client, TopicPartition topicPartition) 
throws InterruptedException {
+        TestUtils.waitForCondition(
+                () -> {
+                    TopicDescription desc = 
client.describeTopics(List.of(topicPartition.topic()))
+                            .allTopicNames().get().get(topicPartition.topic());
+                    return desc.partitions().stream()
+                            .filter(p -> p.partition() == 
topicPartition.partition())
+                            .findFirst()
+                            .map(p -> p.leader() == null || p.leader().id() == 
Node.noNode().id())
+                            .orElse(false);
+                },
+                "Timed out waiting for no leader for " + topicPartition
+        );
+    }
 }

Reply via email to