This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6278e101000 KAFKA-20252: Migrate clearYammerMetrics to TestUtils
(#21679)
6278e101000 is described below
commit 6278e10100064ec873e9194f704c6e00fb0c6b1b
Author: Kartikay Dubey <[email protected]>
AuthorDate: Tue Apr 14 18:36:19 2026 +0530
KAFKA-20252: Migrate clearYammerMetrics to TestUtils (#21679)
Reviewers: Chia-Ping Tsai <[email protected]>, Mickael Maison
<[email protected]>
Co-authored-by: Kartikay Dubey <[email protected]>
---
checkstyle/import-control-server-common.xml | 3 ++
checkstyle/import-control-storage.xml | 1 +
.../kafka/clients/consumer/ShareConsumerTest.java | 3 +-
.../kafka/server/share/DelayedShareFetchTest.java | 3 +-
.../server/share/SharePartitionManagerTest.java | 3 +-
.../kafka/server/share/SharePartitionTest.java | 5 ++-
.../kafka/server/QuorumTestHarness.scala | 3 +-
.../unit/kafka/cluster/AbstractPartitionTest.scala | 5 ++-
.../unit/kafka/network/ConnectionQuotasTest.scala | 5 ++-
.../unit/kafka/network/SocketServerTest.scala | 5 ++-
.../kafka/security/authorizer/AuthorizerTest.scala | 3 +-
.../kafka/server/AbstractFetcherManagerTest.scala | 4 +-
.../kafka/server/AbstractFetcherThreadTest.scala | 5 ++-
.../unit/kafka/server/BrokerMetricNamesTest.scala | 4 +-
.../unit/kafka/server/DelayedProduceTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 3 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 3 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 5 ---
.../apache/kafka/server/util/ServerTestUtils.java | 49 ++++++++++++++++++++++
.../server/share/fetch/ShareFetchTestUtils.java | 29 -------------
.../share/session/ShareSessionCacheTest.java | 4 +-
.../log/remote/storage/RemoteLogManagerTest.java | 17 ++------
.../log/remote/storage/RemoteLogReaderTest.java | 4 +-
.../purgatory/DelayedRemoteListOffsetsTest.java | 4 +-
.../internals/log/LogCleanerIntegrationTest.java | 3 +-
.../storage/internals/log/UnifiedLogTest.java | 12 +-----
27 files changed, 103 insertions(+), 89 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index 960c452b1e5..92da434568e 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -129,6 +129,9 @@
<allow
class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
<allow class="org.apache.kafka.test.TestUtils" />
+ <!-- ServerTestUtils uses yammer metrics for test cleanup -->
+ <allow pkg="org.apache.kafka.server.metrics" />
+ <allow pkg="com.yammer.metrics.core" />
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
<allow class="org.apache.kafka.server.util.ShutdownableThread"
/>
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index 6bb5d76bcf8..040e3761e37 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -84,6 +84,7 @@
<subpackage name="purgatory">
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.server.storage.log" />
+ <allow pkg="org.apache.kafka.server.util" />
</subpackage>
</subpackage>
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 051a4ba612f..deac9011c78 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -76,6 +76,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
@@ -184,7 +185,7 @@ public class ShareConsumerTest {
@AfterEach
public void tearDown() {
- kafka.utils.TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
}
@ClusterTest
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index e34589f5a7b..e5d95ff38c0 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
@@ -123,7 +124,7 @@ public class DelayedShareFetchTest {
@BeforeEach
public void setUp() {
- kafka.utils.TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper",
new SystemTimer("DelayedShareFetchTestTimer"));
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 80f941d488e..45d29fd79b3 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -78,6 +78,7 @@ import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -173,7 +174,7 @@ public class SharePartitionManagerTest {
@BeforeEach
public void setUp() {
time = new MockTime();
- kafka.utils.TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
brokerTopicStats = new BrokerTopicStats();
mockReplicaManager = mock(ReplicaManager.class);
Partition partition = mockPartition();
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c850f9e85a6..2cbb103349a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -100,7 +100,8 @@ import java.util.function.Supplier;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
-import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
+import static org.apache.kafka.server.util.ServerTestUtils.clearYammerMetrics;
+import static org.apache.kafka.server.util.ServerTestUtils.yammerMetricValue;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -138,7 +139,7 @@ public class SharePartitionTest {
@BeforeEach
public void setUp() {
- kafka.utils.TestUtils.clearYammerMetrics();
+ clearYammerMetrics();
mockTimer = new MockTimer();
sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID,
TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
}
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index d3821ee0eee..49ae061e4f3 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.server.{ClientMetricsManager,
ServerSocketFactory}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.timer.SystemTimer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag,
TestInfo}
@@ -349,7 +350,7 @@ abstract class QuorumTestHarness extends Logging {
}
Exit.resetExitProcedure()
Exit.resetHaltProcedure()
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
faultHandler.maybeRethrowFirstException()
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 2375edb43ba..8330b7e25cc 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.metadata.{LeaderRecoveryState,
MetadataCache, MockConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.partition.AlterPartitionListener
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.MockAlterPartitionManager
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
@@ -65,7 +66,7 @@ class AbstractPartitionTest {
@BeforeEach
def setup(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
val logProps = createLogProperties(Map.empty)
logConfig = new LogConfig(logProps)
@@ -111,7 +112,7 @@ class AbstractPartitionTest {
if (tmpDir.exists()) {
logManager.shutdown()
Utils.delete(tmpDir)
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
}
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 0c0d7581e81..90569a9b155 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.{ConnectionThrottledException, SocketServer,
SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
@@ -86,7 +87,7 @@ class ConnectionQuotasTest {
@BeforeEach
def setUp(): Unit = {
// Clean-up any metrics left around by previous tests
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
val metricsPackage = "kafka.network"
val metricsClassName = "ConnectionQuotasTest"
@@ -109,7 +110,7 @@ class ConnectionQuotasTest {
connectionQuotas.close()
}
metrics.close()
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
blockedPercentMeters.clear()
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 6a4b8d8ca67..a90590d04f4 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.ConnectionDisconnectListener
import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.config.Configurator
@@ -82,7 +83,7 @@ class SocketServerTest {
val localAddress = InetAddress.getLoopbackAddress
// Clean-up any metrics left around by previous tests
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
private val apiVersionManager = new
SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
@@ -113,7 +114,7 @@ class SocketServerTest {
sockets.foreach(_.close())
sockets.clear()
Configurator.setLevel(kafkaLogger.getName, logLevelToRestore)
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] =
None, flush: Boolean = true): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 0839990868f..50b405ecd16 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import
org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST,
WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Test
@@ -97,7 +98,7 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
override def tearDown(): Unit = {
authorizer1.close()
authorizer2.close()
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
super.tearDown()
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index a58134dd90f..6936a264bc1 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -17,7 +17,6 @@
package kafka.server
import com.yammer.metrics.core.Gauge
-import kafka.utils.TestUtils
import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
@@ -33,6 +32,7 @@ import org.apache.kafka.server.ResultWithPartitions
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.quota.ReplicationQuotaManager
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@@ -48,7 +48,7 @@ class AbstractFetcherManagerTest {
@BeforeEach
def cleanMetricRegistry(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
private def getMetricValue(name: String): Any = {
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index ea01994d411..305ae8bef6d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -33,6 +33,7 @@ import
kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
import org.apache.kafka.common.message.{FetchResponseData,
OffsetForLeaderEpochRequestData}
import
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
+import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -53,7 +54,7 @@ class AbstractFetcherThreadTest {
@BeforeEach
def cleanMetricRegistry(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
private def allMetricsNames: Set[String] =
KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
@@ -3850,4 +3851,4 @@ class AbstractFetcherThreadTest {
// LogEndOffset is unchanged
assertEquals(0, replicaState.logEndOffset)
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index 300bc2d8d42..00bf7f1415f 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -18,9 +18,9 @@
package kafka.server
import org.apache.kafka.common.test.api.ClusterTest
-import kafka.utils.TestUtils
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
@@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
def tearDown(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
@ClusterTest
diff --git a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
index 25a044c6d08..217b9a15a74 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
@@ -17,10 +17,10 @@
package kafka.server
-import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.DelayedProduce
+import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions._
@@ -30,7 +30,7 @@ class DelayedProduceTest {
@AfterEach
def tearDown(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 814fb7dbd22..3237abade89 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -102,6 +102,7 @@ import
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
RecordValidationStats, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -162,7 +163,7 @@ class KafkaApisTest extends Logging {
Utils.swallow(this.logger.underlying, () => quotas.shutdown())
if (kafkaApis != null)
Utils.swallow(this.logger.underlying, () => kafkaApis.close())
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
metrics.close()
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 1b1a62557b4..c23d8ec8f07 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig,
LogManager, RecordValidationStats, UnifiedLog}
@@ -82,7 +83,7 @@ class ReplicaFetcherThreadTest {
@AfterEach
def cleanup(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
private def createReplicaFetcherThread(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b38e433b292..9639d1e46aa 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -76,6 +76,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
FetchParams, FetchPa
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION,
GENERIC_ERROR_SUPPORTED}
+import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.timer.{MockTimer, SystemTimer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
@@ -108,7 +109,7 @@ import scala.jdk.OptionConverters.{RichOption, RichOptional}
object ReplicaManagerTest {
@AfterAll
def tearDownClass(): Unit = {
- TestUtils.clearYammerMetrics()
+ ServerTestUtils.clearYammerMetrics()
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c0bc7618a57..fc1f10fbbd7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1396,11 +1396,6 @@ object TestUtils extends Logging {
}.sum
}
- def clearYammerMetrics(): Unit = {
- for (metricName <-
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
- KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)
- }
-
/**
* Find an Authorizer that we can call createAcls or deleteAcls on.
*/
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/ServerTestUtils.java
b/server-common/src/test/java/org/apache/kafka/server/util/ServerTestUtils.java
new file mode 100644
index 00000000000..f3898848768
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/util/ServerTestUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Gauge;
+
+public final class ServerTestUtils {
+
+ /**
+ * Clear all the yammer metrics.
+ */
+ public static void clearYammerMetrics() {
+ KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach(
+ metricName ->
KafkaYammerMetrics.defaultRegistry().removeMetric(metricName));
+ }
+
+ /**
+ * Fetch the gauge value from the yammer metrics.
+ *
+ * @param name The name of the metric.
+ * @return The gauge value as a number.
+ */
+ public static Number yammerMetricValue(String name) {
+ Gauge<?> gauge = (Gauge<?>)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(e -> e.getKey().getMBeanName().contains(name))
+ .findFirst()
+ .orElseThrow()
+ .getValue();
+ return (Number) gauge.value();
+ }
+
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
index 1fdb313fa02..0687c957fa1 100644
---
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
+++
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
@@ -23,11 +23,8 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
-import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestUtils;
-import com.yammer.metrics.core.Gauge;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -126,31 +123,5 @@ public class ShareFetchTestUtils {
);
}
- /**
- * Fetch the gauge value from the yammer metrics.
- *
- * @param name The name of the metric.
- * @return The gauge value as a number.
- */
- public static Number yammerMetricValue(String name) {
- try {
- Gauge gauge = (Gauge)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
- .filter(e -> e.getKey().getMBeanName().contains(name))
- .findFirst()
- .orElseThrow()
- .getValue();
- return (Number) gauge.value();
- } catch (Exception e) {
- return 0;
- }
- }
- /**
- * Clear all the yammer metrics.
- */
- public static void clearYammerMetrics() {
- KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach(
- metricName ->
KafkaYammerMetrics.defaultRegistry().removeMetric(metricName)
- );
- }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index c2cc3af450e..aecb35d8990 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -29,8 +29,8 @@ import org.mockito.Mockito;
import java.util.Iterator;
import java.util.List;
-import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.clearYammerMetrics;
-import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
+import static org.apache.kafka.server.util.ServerTestUtils.clearYammerMetrics;
+import static org.apache.kafka.server.util.ServerTestUtils.yammerMetricValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index b1c5090625e..bd75c8f8885 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -46,7 +46,6 @@ import
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
-import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.MockScheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
@@ -67,7 +66,6 @@ import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
-import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import org.junit.jupiter.api.AfterEach;
@@ -137,6 +135,8 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+import static org.apache.kafka.server.util.ServerTestUtils.clearYammerMetrics;
+import static org.apache.kafka.server.util.ServerTestUtils.yammerMetricValue;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -273,7 +273,7 @@ public class RemoteLogManagerTest {
remoteLogManager.close();
remoteLogManager = null;
}
- kafka.utils.TestUtils.clearYammerMetrics();
+ clearYammerMetrics();
}
@Test
@@ -1240,18 +1240,9 @@ public class RemoteLogManagerTest {
safeLongYammerMetricValue("RemoteCopyLagSegments")));
}
- private Object yammerMetricValue(String name) {
- Gauge gauge = (Gauge)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
- .filter(e -> e.getKey().getMBeanName().endsWith(name))
- .findFirst()
- .get()
- .getValue();
- return gauge.value();
- }
-
private long safeLongYammerMetricValue(String name) {
try {
- return (long) yammerMetricValue(name);
+ return yammerMetricValue(name).longValue();
} catch (NoSuchElementException ex) {
return 0L;
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
index 9a6c9a54aa0..a6761b14f0a 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.server.log.remote.storage;
-import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
@@ -58,7 +58,7 @@ public class RemoteLogReaderTest {
@BeforeEach
public void setUp() throws Exception {
- TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
brokerTopicStats = new BrokerTopicStats(true);
when(timer.time(any(Callable.class))).thenAnswer(ans ->
ans.getArgument(0, Callable.class).call());
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
index edcddb64aa2..29ddc9ec71c 100644
---
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.server.purgatory;
-import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
@@ -26,6 +25,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
@@ -66,7 +66,7 @@ public class DelayedRemoteListOffsetsTest {
@AfterEach
public void afterEach() throws Exception {
purgatory.shutdown();
- TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
}
@Test
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
index 02d7a3c7c33..d549fb42927 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
@@ -33,6 +33,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
@@ -118,7 +119,7 @@ public class LogCleanerIntegrationTest {
@AfterEach
public void teardown() throws IOException, InterruptedException {
- kafka.utils.TestUtils.clearYammerMetrics();
+ ServerTestUtils.clearYammerMetrics();
if (cleaner != null) {
cleaner.shutdown();
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index 8b0d8d9abe0..d3bbe410fec 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -79,7 +79,6 @@ import
org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
-import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
import org.junit.jupiter.api.AfterEach;
@@ -125,6 +124,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.kafka.server.util.ServerTestUtils.yammerMetricValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -4368,16 +4368,6 @@ public class UnifiedLogTest {
.orElseThrow(() -> new AssertionError("Unable to find metric "
+ metricName));
}
- @SuppressWarnings("unchecked")
- private Object yammerMetricValue(String name) {
- Gauge<Object> gauge = (Gauge<Object>)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
- .filter(e -> e.getKey().getMBeanName().endsWith(name))
- .findFirst()
- .get()
- .getValue();
- return gauge.value();
- }
-
@Test
public void testTransactionIndexUpdatedThroughReplication() throws
IOException {
short epoch = 0;