This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 994eae39929 KAFKA-17840: Move ReplicationQuotaManager to server module 
(#21702)
994eae39929 is described below

commit 994eae399299b0b6bd3af7c9ff6ed1b8b016b74b
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Mar 23 12:46:01 2026 +0100

    KAFKA-17840: Move ReplicationQuotaManager to server module (#21702)
    
    Move ReplicationQuotaManager, ReplicaQuota and ReplicationQuotaManagerTest.
    
    Reviewers: Uladzislau Blok <[email protected]>, PoAn Yang 
<[email protected]>
---
 core/src/main/java/kafka/server/QuotaFactory.java  |   2 +
 .../main/scala/kafka/server/ConfigHandler.scala    |   1 +
 .../src/main/scala/kafka/server/DelayedFetch.scala |   1 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |   1 +
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |   1 +
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  |   1 +
 .../kafka/server/ReplicaAlterLogDirsManager.scala  |   1 +
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   1 +
 .../scala/kafka/server/ReplicaFetcherManager.scala |   1 +
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   1 +
 .../main/scala/kafka/server/ReplicaManager.scala   |   1 +
 .../kafka/server/share/DelayedShareFetchTest.java  |   2 +-
 .../server/share/SharePartitionManagerTest.java    |   2 +-
 .../kafka/server/DelayedFetchTest.scala            |   1 +
 .../kafka/server/AbstractFetcherManagerTest.scala  |   1 +
 .../unit/kafka/server/ControllerApisTest.scala     |   2 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   1 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   1 +
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   1 +
 .../server/ReplicaManagerConcurrencyTest.scala     |   1 +
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   1 +
 .../unit/kafka/server/ReplicaManagerTest.scala     |   1 +
 .../kafka/server/ReplicationQuotaManagerTest.scala | 124 --------------------
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   2 +-
 .../apache/kafka/server/quota}/ReplicaQuota.java   |   2 +-
 .../server/quota}/ReplicationQuotaManager.java     |   4 +-
 .../server/quota/ReplicationQuotaManagerTest.java  | 129 +++++++++++++++++++++
 29 files changed, 157 insertions(+), 134 deletions(-)

diff --git a/core/src/main/java/kafka/server/QuotaFactory.java 
b/core/src/main/java/kafka/server/QuotaFactory.java
index 896e52da770..aef041dff46 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -29,6 +29,8 @@ import org.apache.kafka.server.quota.ClientQuotaCallback;
 import org.apache.kafka.server.quota.ClientQuotaManager;
 import org.apache.kafka.server.quota.ControllerMutationQuotaManager;
 import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.ReplicaQuota;
+import org.apache.kafka.server.quota.ReplicationQuotaManager;
 
 import java.util.Optional;
 
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 3e82db29092..ed282893352 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.server.common.StopPartition
 import org.apache.kafka.server.log.remote.TopicPartitionLog
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, 
ThrottledReplicaListValidator, UnifiedLog}
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 0c9a561db57..f73acd19d98 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.purgatory.DelayedOperation
+import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, 
LogOffsetMetadata}
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6745cb0ec31..193a1eb8c5b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -65,6 +65,7 @@ import org.apache.kafka.security.DelegationTokenManager
 import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, 
FetchManager, ProcessRole}
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
ShareVersion, StreamsVersion, TransactionVersion}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.server.share.context.ShareFetchContext
 import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
SharePartitionKey}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 30e5231f8c8..bd1f230a6dd 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.{PartitionFetchState, ReplicaFetch, 
ResultWithPartitions}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index ba4b1b51739..67d4bb63f36 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.{FetchRequest, 
FetchResponse, ListOffset
 import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.{PartitionFetchState, ReplicaFetch, 
ResultWithPartitions}
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
index e0473166b36..2a8ccd8ed5c 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.server.common.DirectoryEventHandler
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 7d2e0288456..f3d8fde7160 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
LogStartOffsetIncr
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.server.LeaderEndPoint
 import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 
 import java.util.Optional
 import java.util.concurrent.ConcurrentHashMap
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 96308fb400f..d7510fc6214 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 
 class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                             protected val replicaManager: ReplicaManager,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8444704fb10..4e4ad9772c2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.FetchResponse
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.storage.internals.log.{LogAppendInfo, 
LogStartOffsetIncrementReason}
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
 
 import java.util.Optional
 import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8bccb4dc8e4..e612a33cec7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.partition.PartitionListener
 import 
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, 
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 850208bf22a..0f5c3b2a545 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -19,7 +19,6 @@ package kafka.server.share;
 import kafka.cluster.Partition;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
 
 import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
@@ -36,6 +35,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.quota.ReplicaQuota;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 134f4524cdd..635c10c6424 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -18,7 +18,6 @@ package kafka.server.share;
 
 import kafka.cluster.Partition;
 import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -55,6 +54,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.quota.ReplicaQuota;
 import org.apache.kafka.server.share.CachedSharePartition;
 import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
 import org.apache.kafka.server.share.SharePartitionKey;
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 3865c8ea2f2..5410a341737 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.internal.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult}
 import org.junit.jupiter.api.Test
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index d79c39817e2..a58134dd90f 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.server.ResultWithPartitions
 import org.apache.kafka.server.PartitionFetchState
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index d3e3d548029..7d5d3bd076d 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -59,7 +59,7 @@ import org.apache.kafka.server.SimpleApiVersionManager
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, 
RequestLocal}
 import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.quota.{ClientQuotaManager, 
ControllerMutationQuota, ControllerMutationQuotaManager}
+import org.apache.kafka.server.quota.{ClientQuotaManager, 
ControllerMutationQuota, ControllerMutationQuotaManager, 
ReplicationQuotaManager}
 import org.apache.kafka.storage.internals.log.CleanerConfig
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 2f13f0d2dac..f067219b9ae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index aaf5409e5f3..c5974e7e317 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -96,7 +96,7 @@ import org.apache.kafka.server.config.{ReplicationConfigs, 
ServerConfigs, Server
 import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, SharePartitionKey}
-import org.apache.kafka.server.quota.{ClientQuotaManager, 
ControllerMutationQuota, ControllerMutationQuotaManager, ThrottleCallback}
+import org.apache.kafka.server.quota.{ClientQuotaManager, 
ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota, 
ReplicationQuotaManager, ThrottleCallback}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.server.share.context.{FinalContext, 
ShareSessionContext}
 import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index f16d72a3030..e7db8a743f7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.{PartitionFetchState, ReplicaState, common}
 import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index b77e36ad184..3bd2fc5b133 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.server.PartitionFetchState
 import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, 
RecordValidationStats, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index cc532d05e00..3386a8eee86 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.server.HostedPartition
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
TopicIdPartition}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogDirFailureChannel}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a2e5128f30b..2f5e27464db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
 import org.apache.kafka.server.common.KRaftVersion
+import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{FetchDataInfo, 
FetchPartitionStatus, LogConfig, LogDirFailureChannel, LogOffsetMetadata, 
LogOffsetSnapshot, UnifiedLog}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b83495ea3db..f92551ef1a9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -67,6 +67,7 @@ import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
deleted file mode 100644
index acb5c22ec76..00000000000
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import java.util.Collections
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.QuotaType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.jdk.CollectionConverters._
-
-class ReplicationQuotaManagerTest {
-  private val time = new MockTime
-  private val metrics = new Metrics(new MetricConfig(), 
Collections.emptyList(), time)
-
-  @AfterEach
-  def tearDown(): Unit = {
-    metrics.close()
-  }
-
-  @Test
-  def shouldThrottleOnlyDefinedReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
-    quota.markThrottled("topic1", java.util.List.of(1, 2, 3))
-
-    assertTrue(quota.isThrottled(tp1(1)))
-    assertTrue(quota.isThrottled(tp1(2)))
-    assertTrue(quota.isThrottled(tp1(3)))
-    assertFalse(quota.isThrottled(tp1(4)))
-  }
-
-  @Test
-  def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION, 
time)
-
-    //Given
-    quota.updateQuota(new Quota(100, true))
-
-    //Quota should not be broken when we start
-    assertFalse(quota.isQuotaExceeded)
-
-    //First window is fixed, so we'll skip it
-    time.sleep(1000)
-
-    //When we record up to the quota value after half a window
-    time.sleep(500)
-    quota.record(1)
-
-    //Then it should not break the quota
-    assertFalse(quota.isQuotaExceeded)
-
-    //When we record half the quota (half way through the window), we still 
should not break
-    quota.record(149) //150B, 1.5s
-    assertFalse(quota.isQuotaExceeded)
-
-    //Add a byte to push over quota
-    quota.record(1) //151B, 1.5s
-
-    //Then it should break the quota
-    assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s
-    assertTrue(quota.isQuotaExceeded)
-
-    //When we sleep for the remaining half the window
-    time.sleep(500) //151B, 2s
-
-    //Then Our rate should have halved (i.e back down below the quota)
-    assertFalse(quota.isQuotaExceeded)
-    assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s
-
-    //When we sleep for another half a window (now half way through second 
window)
-    time.sleep(500)
-    quota.record(99) //250B, 2.5s
-
-    //Then the rate should be exceeded again
-    assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s
-    assertFalse(quota.isQuotaExceeded)
-    quota.record(1)
-    assertTrue(quota.isQuotaExceeded)
-    assertEquals(251 / 2.5, rate(metrics), 0)
-
-    //Sleep for 2 more window
-    time.sleep(2 * 1000) //so now at 3.5s
-    assertFalse(quota.isQuotaExceeded)
-    assertEquals(251d / 4.5, rate(metrics), 0)
-  }
-
-  def rate(metrics: Metrics): Double = {
-    val metricName = metrics.metricName("byte-rate", 
QuotaType.LEADER_REPLICATION.toString, "Tracking byte-rate for " + 
QuotaType.LEADER_REPLICATION)
-    val leaderThrottledRate = 
metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
-    leaderThrottledRate
-  }
-
-  @Test
-  def shouldSupportWildcardThrottledReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time)
-
-    //When
-    quota.markThrottled("MyTopic")
-
-    //Then
-    assertTrue(quota.isThrottled(new TopicPartition("MyTopic", 0)))
-    assertFalse(quota.isThrottled(new TopicPartition("MyOtherTopic", 0)))
-  }
-
-  private def tp1(id: Int): TopicPartition = new TopicPartition("topic1", id)
-}
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 75e968c58d8..ade72c71601 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -29,7 +29,6 @@ import kafka.server.QuotaFactory;
 import kafka.server.RemoteLeaderEndPoint;
 import kafka.server.ReplicaFetcherThread;
 import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
 
@@ -59,6 +58,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.network.BrokerEndPoint;
+import org.apache.kafka.server.quota.ReplicaQuota;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 5b0cf9de898..ce91134e1a6 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -26,7 +26,6 @@ import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
-import kafka.server.ReplicationQuotaManager;
 import kafka.server.builders.KafkaApisBuilder;
 import kafka.server.share.SharePartitionManager;
 
@@ -65,6 +64,7 @@ import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.quota.ClientQuotaManager;
 import org.apache.kafka.server.quota.ControllerMutationQuotaManager;
+import org.apache.kafka.server.quota.ReplicationQuotaManager;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
 import org.mockito.Mockito;
diff --git a/core/src/main/java/kafka/server/ReplicaQuota.java 
b/server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
similarity index 96%
rename from core/src/main/java/kafka/server/ReplicaQuota.java
rename to server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
index 7d65ba25001..97efae8b94b 100644
--- a/core/src/main/java/kafka/server/ReplicaQuota.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.server;
+package org.apache.kafka.server.quota;
 
 import org.apache.kafka.common.TopicPartition;
 
diff --git a/core/src/main/java/kafka/server/ReplicationQuotaManager.java 
b/server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
similarity index 97%
rename from core/src/main/java/kafka/server/ReplicationQuotaManager.java
rename to 
server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
index 9301e59dfe6..c0d416f955a 100644
--- a/core/src/main/java/kafka/server/ReplicationQuotaManager.java
+++ 
b/server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.server;
+package org.apache.kafka.server.quota;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -27,8 +27,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.SimpleRate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
-import org.apache.kafka.server.quota.QuotaType;
-import org.apache.kafka.server.quota.SensorAccess;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
 
b/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
new file mode 100644
index 00000000000..17cf9492f4f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReplicationQuotaManagerTest {
+
+    private final MockTime time = new MockTime();
+    private final Metrics metrics = new Metrics(new MetricConfig(), List.of(), 
time);
+
+    @AfterEach
+    public void tearDown() {
+        metrics.close();
+    }
+
+    @Test
+    public void shouldThrottleOnlyDefinedReplicas() {
+        ReplicationQuotaManager quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time);
+        quota.markThrottled("topic1", List.of(1, 2, 3));
+
+        assertTrue(quota.isThrottled(new TopicPartition("topic1", 1)));
+        assertTrue(quota.isThrottled(new TopicPartition("topic1", 2)));
+        assertTrue(quota.isThrottled(new TopicPartition("topic1", 3)));
+        assertFalse(quota.isThrottled(new TopicPartition("topic1", 4)));
+    }
+
+    @Test
+    public void shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses() {
+        ReplicationQuotaManager quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION, 
time);
+
+        //Given
+        quota.updateQuota(new Quota(100, true));
+
+        //Quota should not be broken when we start
+        assertFalse(quota.isQuotaExceeded());
+
+        //First window is fixed, so we'll skip it
+        time.sleep(1000);
+
+        //When we record up to the quota value after half a window
+        time.sleep(500);
+        quota.record(1);
+
+        //Then it should not break the quota
+        assertFalse(quota.isQuotaExceeded());
+
+        //When we record half the quota (halfway through the window), we still 
should not break
+        quota.record(149); //150B, 1.5s
+        assertFalse(quota.isQuotaExceeded());
+
+        //Add a byte to push over quota
+        quota.record(1); //151B, 1.5s
+
+        //Then it should break the quota
+        assertEquals(151 / 1.5, rate(metrics), 0); //151B, 1.5s
+        assertTrue(quota.isQuotaExceeded());
+
+        //When we sleep for the remaining half the window
+        time.sleep(500); //151B, 2s
+
+        //Then Our rate should have halved (i.e. back down below the quota)
+        assertFalse(quota.isQuotaExceeded());
+        assertEquals(151d / 2, rate(metrics), 0.1); //151B, 2s
+
+        //When we sleep for another half a window (now halfway through second 
window)
+        time.sleep(500);
+        quota.record(99); //250B, 2.5s
+
+        //Then the rate should be exceeded again
+        assertEquals(250 / 2.5, rate(metrics), 0); //250B, 2.5s
+        assertFalse(quota.isQuotaExceeded());
+        quota.record(1);
+        assertTrue(quota.isQuotaExceeded());
+        assertEquals(251 / 2.5, rate(metrics), 0);
+
+        //Sleep for 2 more window
+        time.sleep(2 * 1000); //so now at 3.5s
+        assertFalse(quota.isQuotaExceeded());
+        assertEquals(251 / 4.5, rate(metrics), 0);
+    }
+
+    private double rate(Metrics metrics) {
+        MetricName metricName = metrics.metricName("byte-rate", 
QuotaType.LEADER_REPLICATION.toString(), "Tracking byte-rate for " + 
QuotaType.LEADER_REPLICATION);
+        return (double) metrics.metrics().get(metricName).metricValue();
+    }
+
+    @Test
+    public void shouldSupportWildcardThrottledReplicas() {
+        ReplicationQuotaManager quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time);
+
+        //When
+        quota.markThrottled("MyTopic");
+
+        //Then
+        assertTrue(quota.isThrottled(new TopicPartition("MyTopic", 0)));
+        assertFalse(quota.isThrottled(new TopicPartition("MyOtherTopic", 0)));
+    }
+
+}


Reply via email to