This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 994eae39929 KAFKA-17840: Move ReplicationQuotaManager to server module
(#21702)
994eae39929 is described below
commit 994eae399299b0b6bd3af7c9ff6ed1b8b016b74b
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Mar 23 12:46:01 2026 +0100
KAFKA-17840: Move ReplicationQuotaManager to server module (#21702)
Move ReplicationQuotaManager, ReplicaQuota and ReplicationQuotaManagerTest.
Reviewers: Uladzislau Blok <[email protected]>, PoAn Yang
<[email protected]>
---
core/src/main/java/kafka/server/QuotaFactory.java | 2 +
.../main/scala/kafka/server/ConfigHandler.scala | 1 +
.../src/main/scala/kafka/server/DelayedFetch.scala | 1 +
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
.../scala/kafka/server/LocalLeaderEndPoint.scala | 1 +
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 1 +
.../kafka/server/ReplicaAlterLogDirsManager.scala | 1 +
.../kafka/server/ReplicaAlterLogDirsThread.scala | 1 +
.../scala/kafka/server/ReplicaFetcherManager.scala | 1 +
.../scala/kafka/server/ReplicaFetcherThread.scala | 1 +
.../main/scala/kafka/server/ReplicaManager.scala | 1 +
.../kafka/server/share/DelayedShareFetchTest.java | 2 +-
.../server/share/SharePartitionManagerTest.java | 2 +-
.../kafka/server/DelayedFetchTest.scala | 1 +
.../kafka/server/AbstractFetcherManagerTest.scala | 1 +
.../unit/kafka/server/ControllerApisTest.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 1 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 1 +
.../kafka/server/ReplicaFetcherThreadTest.scala | 1 +
.../server/ReplicaManagerConcurrencyTest.scala | 1 +
.../kafka/server/ReplicaManagerQuotasTest.scala | 1 +
.../unit/kafka/server/ReplicaManagerTest.scala | 1 +
.../kafka/server/ReplicationQuotaManagerTest.scala | 124 --------------------
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../apache/kafka/server/quota}/ReplicaQuota.java | 2 +-
.../server/quota}/ReplicationQuotaManager.java | 4 +-
.../server/quota/ReplicationQuotaManagerTest.java | 129 +++++++++++++++++++++
29 files changed, 157 insertions(+), 134 deletions(-)
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java
b/core/src/main/java/kafka/server/QuotaFactory.java
index 896e52da770..aef041dff46 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -29,6 +29,8 @@ import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaManager;
import org.apache.kafka.server.quota.ControllerMutationQuotaManager;
import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.ReplicaQuota;
+import org.apache.kafka.server.quota.ReplicationQuotaManager;
import java.util.Optional;
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 3e82db29092..ed282893352 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.StopPartition
import org.apache.kafka.server.log.remote.TopicPartitionLog
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason,
ThrottledReplicaListValidator, UnifiedLog}
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 0c9a561db57..f73acd19d98 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
+import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus,
LogOffsetMetadata}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6745cb0ec31..193a1eb8c5b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -65,6 +65,7 @@ import org.apache.kafka.security.DelegationTokenManager
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager,
FetchManager, ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
ShareVersion, StreamsVersion, TransactionVersion}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 30e5231f8c8..bd1f230a6dd 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.{TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.{PartitionFetchState, ReplicaFetch,
ResultWithPartitions}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index ba4b1b51739..67d4bb63f36 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.{FetchRequest,
FetchResponse, ListOffset
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.{PartitionFetchState, ReplicaFetch,
ResultWithPartitions}
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
index e0473166b36..2a8ccd8ed5c 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 7d2e0288456..f3d8fde7160 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.storage.internals.log.{LogAppendInfo,
LogStartOffsetIncr
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.PartitionFetchState
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 96308fb400f..d7510fc6214 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
class ReplicaFetcherManager(brokerConfig: KafkaConfig,
protected val replicaManager: ReplicaManager,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8444704fb10..4e4ad9772c2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.{LogAppendInfo,
LogStartOffsetIncrementReason}
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicaQuota
import java.util.Optional
import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8bccb4dc8e4..e612a33cec7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.partition.PartitionListener
import
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus,
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 850208bf22a..0f5c3b2a545 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -19,7 +19,6 @@ package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
@@ -36,6 +35,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 134f4524cdd..635c10c6424 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -18,7 +18,6 @@ package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -55,6 +54,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
+import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.SharePartitionKey;
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 3865c8ea2f2..5410a341737 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.internal.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.internals.log.{FetchDataInfo,
FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult}
import org.junit.jupiter.api.Test
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index d79c39817e2..a58134dd90f 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.ResultWithPartitions
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index d3e3d548029..7d5d3bd076d 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -59,7 +59,7 @@ import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion,
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock,
RequestLocal}
import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager}
+import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager,
ReplicationQuotaManager}
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 2f13f0d2dac..f067219b9ae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index aaf5409e5f3..c5974e7e317 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -96,7 +96,7 @@ import org.apache.kafka.server.config.{ReplicationConfigs,
ServerConfigs, Server
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, SharePartitionKey}
-import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager, ThrottleCallback}
+import org.apache.kafka.server.quota.{ClientQuotaManager,
ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota,
ReplicationQuotaManager, ThrottleCallback}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index f16d72a3030..e7db8a743f7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{PartitionFetchState, ReplicaState, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index b77e36ad184..3bd2fc5b133 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig,
RecordValidationStats, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index cc532d05e00..3386a8eee86 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.server.HostedPartition
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
TopicIdPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogDirFailureChannel}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a2e5128f30b..2f5e27464db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
import org.apache.kafka.server.common.KRaftVersion
+import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{FetchDataInfo,
FetchPartitionStatus, LogConfig, LogDirFailureChannel, LogOffsetMetadata,
LogOffsetSnapshot, UnifiedLog}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b83495ea3db..f92551ef1a9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -67,6 +67,7 @@ import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets}
+import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
deleted file mode 100644
index acb5c22ec76..00000000000
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.Collections
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.QuotaType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.jdk.CollectionConverters._
-
-class ReplicationQuotaManagerTest {
- private val time = new MockTime
- private val metrics = new Metrics(new MetricConfig(),
Collections.emptyList(), time)
-
- @AfterEach
- def tearDown(): Unit = {
- metrics.close()
- }
-
- @Test
- def shouldThrottleOnlyDefinedReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
- quota.markThrottled("topic1", java.util.List.of(1, 2, 3))
-
- assertTrue(quota.isThrottled(tp1(1)))
- assertTrue(quota.isThrottled(tp1(2)))
- assertTrue(quota.isThrottled(tp1(3)))
- assertFalse(quota.isThrottled(tp1(4)))
- }
-
- @Test
- def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION,
time)
-
- //Given
- quota.updateQuota(new Quota(100, true))
-
- //Quota should not be broken when we start
- assertFalse(quota.isQuotaExceeded)
-
- //First window is fixed, so we'll skip it
- time.sleep(1000)
-
- //When we record up to the quota value after half a window
- time.sleep(500)
- quota.record(1)
-
- //Then it should not break the quota
- assertFalse(quota.isQuotaExceeded)
-
- //When we record half the quota (half way through the window), we still
should not break
- quota.record(149) //150B, 1.5s
- assertFalse(quota.isQuotaExceeded)
-
- //Add a byte to push over quota
- quota.record(1) //151B, 1.5s
-
- //Then it should break the quota
- assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s
- assertTrue(quota.isQuotaExceeded)
-
- //When we sleep for the remaining half the window
- time.sleep(500) //151B, 2s
-
- //Then Our rate should have halved (i.e back down below the quota)
- assertFalse(quota.isQuotaExceeded)
- assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s
-
- //When we sleep for another half a window (now half way through second
window)
- time.sleep(500)
- quota.record(99) //250B, 2.5s
-
- //Then the rate should be exceeded again
- assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s
- assertFalse(quota.isQuotaExceeded)
- quota.record(1)
- assertTrue(quota.isQuotaExceeded)
- assertEquals(251 / 2.5, rate(metrics), 0)
-
- //Sleep for 2 more window
- time.sleep(2 * 1000) //so now at 3.5s
- assertFalse(quota.isQuotaExceeded)
- assertEquals(251d / 4.5, rate(metrics), 0)
- }
-
- def rate(metrics: Metrics): Double = {
- val metricName = metrics.metricName("byte-rate",
QuotaType.LEADER_REPLICATION.toString, "Tracking byte-rate for " +
QuotaType.LEADER_REPLICATION)
- val leaderThrottledRate =
metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
- leaderThrottledRate
- }
-
- @Test
- def shouldSupportWildcardThrottledReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time)
-
- //When
- quota.markThrottled("MyTopic")
-
- //Then
- assertTrue(quota.isThrottled(new TopicPartition("MyTopic", 0)))
- assertFalse(quota.isThrottled(new TopicPartition("MyOtherTopic", 0)))
- }
-
- private def tp1(id: Int): TopicPartition = new TopicPartition("topic1", id)
-}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 75e968c58d8..ade72c71601 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -29,7 +29,6 @@ import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
-import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
@@ -59,6 +58,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
+import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 5b0cf9de898..ce91134e1a6 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -26,7 +26,6 @@ import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
-import kafka.server.ReplicationQuotaManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.share.SharePartitionManager;
@@ -65,6 +64,7 @@ import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.quota.ClientQuotaManager;
import org.apache.kafka.server.quota.ControllerMutationQuotaManager;
+import org.apache.kafka.server.quota.ReplicationQuotaManager;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.mockito.Mockito;
diff --git a/core/src/main/java/kafka/server/ReplicaQuota.java
b/server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
similarity index 96%
rename from core/src/main/java/kafka/server/ReplicaQuota.java
rename to server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
index 7d65ba25001..97efae8b94b 100644
--- a/core/src/main/java/kafka/server/ReplicaQuota.java
+++ b/server/src/main/java/org/apache/kafka/server/quota/ReplicaQuota.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server.quota;
import org.apache.kafka.common.TopicPartition;
diff --git a/core/src/main/java/kafka/server/ReplicationQuotaManager.java
b/server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
similarity index 97%
rename from core/src/main/java/kafka/server/ReplicationQuotaManager.java
rename to
server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
index 9301e59dfe6..c0d416f955a 100644
--- a/core/src/main/java/kafka/server/ReplicationQuotaManager.java
+++
b/server/src/main/java/org/apache/kafka/server/quota/ReplicationQuotaManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server.quota;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -27,8 +27,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
-import org.apache.kafka.server.quota.QuotaType;
-import org.apache.kafka.server.quota.SensorAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
b/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
new file mode 100644
index 00000000000..17cf9492f4f
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/quota/ReplicationQuotaManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReplicationQuotaManagerTest {
+
+ private final MockTime time = new MockTime();
+ private final Metrics metrics = new Metrics(new MetricConfig(), List.of(),
time);
+
+ @AfterEach
+ public void tearDown() {
+ metrics.close();
+ }
+
+ @Test
+ public void shouldThrottleOnlyDefinedReplicas() {
+ ReplicationQuotaManager quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time);
+ quota.markThrottled("topic1", List.of(1, 2, 3));
+
+ assertTrue(quota.isThrottled(new TopicPartition("topic1", 1)));
+ assertTrue(quota.isThrottled(new TopicPartition("topic1", 2)));
+ assertTrue(quota.isThrottled(new TopicPartition("topic1", 3)));
+ assertFalse(quota.isThrottled(new TopicPartition("topic1", 4)));
+ }
+
+ @Test
+ public void shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses() {
+ ReplicationQuotaManager quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION,
time);
+
+ //Given
+ quota.updateQuota(new Quota(100, true));
+
+ //Quota should not be broken when we start
+ assertFalse(quota.isQuotaExceeded());
+
+ //First window is fixed, so we'll skip it
+ time.sleep(1000);
+
+ //When we record up to the quota value after half a window
+ time.sleep(500);
+ quota.record(1);
+
+ //Then it should not break the quota
+ assertFalse(quota.isQuotaExceeded());
+
+ //When we record half the quota (halfway through the window), we still
should not break
+ quota.record(149); //150B, 1.5s
+ assertFalse(quota.isQuotaExceeded());
+
+ //Add a byte to push over quota
+ quota.record(1); //151B, 1.5s
+
+ //Then it should break the quota
+ assertEquals(151 / 1.5, rate(metrics), 0); //151B, 1.5s
+ assertTrue(quota.isQuotaExceeded());
+
+ //When we sleep for the remaining half the window
+ time.sleep(500); //151B, 2s
+
+ //Then Our rate should have halved (i.e. back down below the quota)
+ assertFalse(quota.isQuotaExceeded());
+ assertEquals(151d / 2, rate(metrics), 0.1); //151B, 2s
+
+ //When we sleep for another half a window (now halfway through second
window)
+ time.sleep(500);
+ quota.record(99); //250B, 2.5s
+
+ //Then the rate should be exceeded again
+ assertEquals(250 / 2.5, rate(metrics), 0); //250B, 2.5s
+ assertFalse(quota.isQuotaExceeded());
+ quota.record(1);
+ assertTrue(quota.isQuotaExceeded());
+ assertEquals(251 / 2.5, rate(metrics), 0);
+
+ //Sleep for 2 more window
+ time.sleep(2 * 1000); //so now at 3.5s
+ assertFalse(quota.isQuotaExceeded());
+ assertEquals(251 / 4.5, rate(metrics), 0);
+ }
+
+ private double rate(Metrics metrics) {
+ MetricName metricName = metrics.metricName("byte-rate",
QuotaType.LEADER_REPLICATION.toString(), "Tracking byte-rate for " +
QuotaType.LEADER_REPLICATION);
+ return (double) metrics.metrics().get(metricName).metricValue();
+ }
+
+ @Test
+ public void shouldSupportWildcardThrottledReplicas() {
+ ReplicationQuotaManager quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time);
+
+ //When
+ quota.markThrottled("MyTopic");
+
+ //Then
+ assertTrue(quota.isThrottled(new TopicPartition("MyTopic", 0)));
+ assertFalse(quota.isThrottled(new TopicPartition("MyOtherTopic", 0)));
+ }
+
+}