This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c0e77d16260 KAFKA-19914: Move AlterPartitionManager to server module
(#21432)
c0e77d16260 is described below
commit c0e77d1626078c8e0b7c3021cb2ecae2ed325e11
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Fri Apr 10 18:41:21 2026 +0800
KAFKA-19914: Move AlterPartitionManager to server module (#21432)
Migrate AlterPartitionManager from Scala to Java in the
`org.apache.kafka.server.partition` package, including the interface,
`DefaultAlterPartitionManager` implementation, and `AlterPartitionItem`
record. Also migrate `AlterPartitionManagerTest` and move
`MockAlterPartitionManager` from `TestUtils.scala` to
`org.apache.kafka.server.util`.
Reviewers: Mickael Maison <[email protected]>
---
build.gradle | 1 +
.../server/builders/ReplicaManagerBuilder.java | 2 +-
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../scala/kafka/server/AlterPartitionManager.scala | 308 ------------
.../src/main/scala/kafka/server/BrokerServer.scala | 9 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/server/LocalLeaderEndPointTest.scala | 1 +
.../unit/kafka/cluster/AbstractPartitionTest.scala | 4 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 3 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 33 +-
.../kafka/server/AlterPartitionManagerTest.scala | 533 --------------------
.../server/HighwatermarkPersistenceTest.scala | 4 +-
.../unit/kafka/server/IsrExpirationTest.scala | 5 +-
.../server/ReplicaManagerConcurrencyTest.scala | 5 +
.../kafka/server/ReplicaManagerQuotasTest.scala | 1 +
.../unit/kafka/server/ReplicaManagerTest.scala | 1 +
.../server/epoch/OffsetsForLeaderEpochTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 52 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../partition/PartitionMakeFollowerBenchmark.java | 2 +-
.../UpdateFollowerFetchStateBenchmark.java | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 2 +-
.../kafka/server/partition/AlterPartitionItem.java | 27 +
.../server/partition/AlterPartitionManager.java | 38 ++
.../partition/DefaultAlterPartitionManager.java | 287 +++++++++++
.../partition/AlterPartitionManagerTest.java | 543 +++++++++++++++++++++
.../server/util/MockAlterPartitionManager.java | 80 +++
28 files changed, 1024 insertions(+), 931 deletions(-)
diff --git a/build.gradle b/build.gradle
index 36edce22f3f..6570cd7bbb2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3446,6 +3446,7 @@ project(':jmh-benchmarks') {
implementation project(':connect:json')
implementation project(':clients').sourceSets.test.output
implementation project(':server-common').sourceSets.test.output
+ implementation project(':server').sourceSets.test.output
implementation project(':metadata').sourceSets.test.output
implementation libs.jmhCore
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 1eecd995024..561447d9f97 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -17,7 +17,6 @@
package kafka.server.builders;
-import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
@@ -27,6 +26,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.DelayedActionQueue;
import org.apache.kafka.server.common.DirectoryEventHandler;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogManager;
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index f303b0fede7..49cd785355b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -43,7 +43,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin,
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo,
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog,
VerificationGuard}
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.partition.{AlterPartitionListener,
AssignmentState, CommittedPartitionState, OngoingReassignmentState,
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange,
PendingShrinkIsr, SimpleAssignmentState}
+import org.apache.kafka.server.partition.{AlterPartitionListener,
AlterPartitionManager, AssignmentState, CommittedPartitionState,
OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr,
PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
import org.apache.kafka.server.replica.Replica
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
deleted file mode 100644
index 3b060a35d8f..00000000000
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
-import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.errors.OperationNotAttemptedException
-import org.apache.kafka.common.message.AlterPartitionRequestData
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AlterPartitionRequest,
AlterPartitionResponse}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.server.{ControllerInformation,
NodeToControllerChannelManagerImpl}
-
-import java.util.function.Supplier
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-import scala.jdk.OptionConverters.RichOptional
-
-/**
- * Handles updating the ISR by sending AlterPartition requests to the
controller. Updating the ISR is an asynchronous
- * operation, so partitions will learn about the result of their request
through a callback.
- *
- * Note that ISR state changes can still be initiated by the controller and
sent to the partitions via LeaderAndIsr
- * requests.
- */
-trait AlterPartitionManager {
- def start(): Unit = {}
-
- def shutdown(): Unit = {}
-
- def submit(
- topicIdPartition: TopicIdPartition,
- leaderAndIsr: LeaderAndIsr
- ): CompletableFuture[LeaderAndIsr]
-}
-
-case class AlterPartitionItem(
- topicIdPartition: TopicIdPartition,
- leaderAndIsr: LeaderAndIsr,
- future: CompletableFuture[LeaderAndIsr]
-)
-
-object AlterPartitionManager {
-
- /**
- * Factory to AlterPartition based implementation
- */
- def apply(
- config: KafkaConfig,
- scheduler: Scheduler,
- controllerNodeProvider: Supplier[ControllerInformation],
- time: Time,
- metrics: Metrics,
- threadNamePrefix: String,
- brokerEpochSupplier: () => Long,
- ): AlterPartitionManager = {
- val channelManager = new NodeToControllerChannelManagerImpl(
- controllerNodeProvider,
- time,
- metrics,
- config,
- "alter-partition",
- threadNamePrefix,
- Long.MaxValue
- )
- new DefaultAlterPartitionManager(
- controllerChannelManager = channelManager,
- scheduler = scheduler,
- time = time,
- brokerId = config.brokerId,
- brokerEpochSupplier = brokerEpochSupplier
- )
- }
-}
-
-class DefaultAlterPartitionManager(
- val controllerChannelManager: NodeToControllerChannelManager,
- val scheduler: Scheduler,
- val time: Time,
- val brokerId: Int,
- val brokerEpochSupplier: () => Long,
-) extends AlterPartitionManager with Logging {
-
- // Used to allow only one pending ISR update per partition (visible for
testing)
- private[server] val unsentIsrUpdates = new
ConcurrentHashMap[TopicIdPartition, AlterPartitionItem]()
-
- // Used to allow only one in-flight request at a time
- private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
-
- override def start(): Unit = {
- controllerChannelManager.start()
- }
-
- override def shutdown(): Unit = {
- controllerChannelManager.shutdown()
- }
-
- override def submit(
- topicIdPartition: TopicIdPartition,
- leaderAndIsr: LeaderAndIsr
- ): CompletableFuture[LeaderAndIsr] = {
- val future = new CompletableFuture[LeaderAndIsr]()
- val alterPartitionItem = AlterPartitionItem(topicIdPartition,
leaderAndIsr, future)
- val enqueued =
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition,
alterPartitionItem) == null
- if (enqueued) {
- maybePropagateIsrChanges()
- } else {
- future.completeExceptionally(new OperationNotAttemptedException(
- s"Failed to enqueue ISR change state $leaderAndIsr for partition
$topicIdPartition"))
- }
- future
- }
-
- private[server] def maybePropagateIsrChanges(): Unit = {
- // Send all pending items if there is not already a request in-flight.
- if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false,
true)) {
- // Copy current unsent ISRs but don't remove from the map, they get
cleared in the response handler
- val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()
- unsentIsrUpdates.values.forEach(item =>
inflightAlterPartitionItems.append(item))
- sendRequest(inflightAlterPartitionItems.toSeq)
- }
- }
-
- private[server] def clearInFlightRequest(): Unit = {
- if (!inflightRequest.compareAndSet(true, false)) {
- warn("Attempting to clear AlterPartition in-flight flag when no apparent
request is in-flight")
- }
- }
-
- private def sendRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): Unit = {
- val brokerEpoch = brokerEpochSupplier()
- val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
- debug(s"Sending AlterPartition to controller $request")
-
- // We will not time out AlterPartition request, instead letting it retry
indefinitely
- // until a response is received, or a new LeaderAndIsr overwrites the
existing isrState
- // which causes the response for those partitions to be ignored.
- controllerChannelManager.sendRequest(request,
- new ControllerRequestCompletionHandler {
- override def onComplete(response: ClientResponse): Unit = {
- debug(s"Received AlterPartition response $response")
- val error = try {
- if (response.authenticationException != null) {
- // For now, we treat authentication errors as retriable. We use
the
- // `NETWORK_EXCEPTION` error code for lack of a good alternative.
- // Note that `NodeToControllerChannelManager` will still log the
- // authentication errors so that users have a chance to fix the
problem.
- Errors.NETWORK_EXCEPTION
- } else if (response.versionMismatch != null) {
- Errors.UNSUPPORTED_VERSION
- } else {
- handleAlterPartitionResponse(
- response.responseBody.asInstanceOf[AlterPartitionResponse],
- brokerEpoch,
- inflightAlterPartitionItems
- )
- }
- } finally {
- // clear the flag so future requests can proceed
- clearInFlightRequest()
- }
-
- // check if we need to send another request right away
- error match {
- case Errors.NONE =>
- // In the normal case, check for pending updates to send
immediately
- maybePropagateIsrChanges()
- case _ =>
- // If we received a top-level error from the controller, retry
the request in the near future
- scheduler.scheduleOnce("send-alter-partition", () =>
maybePropagateIsrChanges(), 50)
- }
- }
-
- override def onTimeout(): Unit = {
- throw new IllegalStateException("Encountered unexpected timeout when
sending AlterPartition to the controller")
- }
- })
- }
-
- /**
- * Builds an AlterPartition request.
- *
- * While building the request, we don't know which version of the
AlterPartition API is
- * supported by the controller. The final decision is taken when the
AlterPartitionRequest
- * is built in the network client based on the advertised api versions of
the controller.
- *
- * @return an AlterPartitionRequest.Builder with the provided parameters.
- */
- private def buildRequest(
- inflightAlterPartitionItems: Seq[AlterPartitionItem],
- brokerEpoch: Long
- ): AlterPartitionRequest.Builder = {
- val message = new AlterPartitionRequestData()
- .setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpoch)
-
- inflightAlterPartitionItems.groupBy(_.topicIdPartition.topicId).foreach {
case (topicId, items) =>
- val topicData = new
AlterPartitionRequestData.TopicData().setTopicId(topicId)
- message.topics.add(topicData)
-
- items.foreach { item =>
- val partitionData = new AlterPartitionRequestData.PartitionData()
- .setPartitionIndex(item.topicIdPartition.partitionId)
- .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
- .setNewIsrWithEpochs(item.leaderAndIsr.isrWithBrokerEpoch)
- .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
-
-
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
-
- topicData.partitions.add(partitionData)
- }
- }
-
- new AlterPartitionRequest.Builder(message)
- }
-
- private def handleAlterPartitionResponse(
- alterPartitionResp: AlterPartitionResponse,
- sentBrokerEpoch: Long,
- inflightAlterPartitionItems: Seq[AlterPartitionItem],
- ): Errors = {
- val data = alterPartitionResp.data
-
- Errors.forCode(data.errorCode) match {
- case Errors.STALE_BROKER_EPOCH =>
- warn(s"Broker had a stale broker epoch ($sentBrokerEpoch), retrying.")
-
- case Errors.CLUSTER_AUTHORIZATION_FAILED =>
- error(s"Broker is not authorized to send AlterPartition to controller",
- Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is not
authorized to send AlterPartition to controller"))
-
- case Errors.NONE =>
- // Collect partition-level responses to pass to the callbacks
- val partitionResponses = new mutable.HashMap[TopicIdPartition,
Either[Errors, LeaderAndIsr]]()
- data.topics.forEach { topic =>
- topic.partitions.forEach { partition =>
- val tp = new TopicIdPartition(topic.topicId,
partition.partitionIndex)
- val apiError = Errors.forCode(partition.errorCode)
- debug(s"Controller successfully handled AlterPartition request for
$tp: $partition")
- if (apiError == Errors.NONE) {
-
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match {
- case Some(leaderRecoveryState) =>
- partitionResponses(tp) = Right(
- new LeaderAndIsr(
- partition.leaderId,
- partition.leaderEpoch,
- partition.isr,
- leaderRecoveryState,
- partition.partitionEpoch
- )
- )
-
- case None =>
- error(s"Controller returned an invalid leader recovery state
(${partition.leaderRecoveryState}) for $tp: $partition")
- partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
- }
- } else {
- partitionResponses(tp) = Left(apiError)
- }
- }
- }
-
- // Iterate across the items we sent rather than what we received to
ensure we run the callback even if a
- // partition was somehow erroneously excluded from the response. Note
that these callbacks are run from
- // the leaderIsrUpdateLock write lock in
Partition#sendAlterPartitionRequest
- inflightAlterPartitionItems.foreach { inflightAlterPartition =>
- partitionResponses.get(inflightAlterPartition.topicIdPartition)
match {
- case Some(leaderAndIsrOrError) =>
- // Regardless of callback outcome, we need to clear from the
unsent updates map to unblock further
- // updates. We clear it now to allow the callback to submit a
new update if needed.
- unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition)
- leaderAndIsrOrError match {
- case Left(error) =>
inflightAlterPartition.future.completeExceptionally(error.exception)
- case Right(leaderAndIsr) =>
inflightAlterPartition.future.complete(leaderAndIsr)
- }
- case None =>
- // Don't remove this partition from the update map so it will
get re-sent
- warn(s"Partition ${inflightAlterPartition.topicIdPartition} was
sent but not included in the response")
- }
- }
-
- case e =>
- warn(s"Controller returned an unexpected top-level error when handling
AlterPartition request: $e")
- }
-
- Errors.forCode(data.errorCode)
- }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 102981827ce..fe0fd1d2ec5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.server.{AssignmentsManager,
BrokerFeatures, BrokerLifecy
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel,
LogManager => JLogManager}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
+import org.apache.kafka.server.partition.{AlterPartitionManager,
DefaultAlterPartitionManager}
import java.time.Duration
import java.util
@@ -290,14 +291,14 @@ class BrokerServer(
remoteLogManagerOpt = createRemoteLogManager(listenerInfo)
- alterPartitionManager = AlterPartitionManager(
+ alterPartitionManager = DefaultAlterPartitionManager.create(
config,
- scheduler = kafkaScheduler,
+ kafkaScheduler,
controllerNodeProvider,
- time = time,
+ time,
metrics,
s"broker-${config.nodeId}-",
- brokerEpochSupplier = () => lifecycleManager.brokerEpoch
+ () => lifecycleManager.brokerEpoch
)
alterPartitionManager.start()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 50d6f0ff4a3..8b0d3950c17 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.partition.PartitionListener
+import org.apache.kafka.server.partition.{AlterPartitionManager,
PartitionListener}
import
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus,
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 3d8e4a19d60..71f33100094 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.partition.AlterPartitionManager
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogDirFailureChannel}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 7ec1bc56e3a..2375edb43ba 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -17,7 +17,6 @@
package kafka.cluster
import kafka.utils.TestUtils
-import kafka.utils.TestUtils.MockAlterPartitionManager
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.utils.Utils
@@ -26,6 +25,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.partition.AlterPartitionListener
import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.MockAlterPartitionManager
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -78,7 +78,7 @@ class AbstractPartitionTest {
new CleanerConfig(false), time, transactionVerificationEnabled = true)
logManager.startup(util.Set.of)
- alterPartitionManager = TestUtils.createAlterIsrManager()
+ alterPartitionManager = new MockAlterPartitionManager()
alterPartitionListener = createIsrChangeListener()
partition = new Partition(topicPartition,
replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index d425b08932d..1c2f19c95c7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -22,7 +22,6 @@ import java.util
import java.util.{Optional, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.server._
import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord}
@@ -33,7 +32,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState,
MetadataCache, MockConfigRepository, PartitionRegistration}
import org.apache.kafka.server.common.{RequestLocal, TopicIdPartition}
import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.partition.{AlterPartitionListener,
CommittedPartitionState, PendingShrinkIsr}
+import org.apache.kafka.server.partition.{AlterPartitionListener,
AlterPartitionManager, CommittedPartitionState, PendingShrinkIsr}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 0f4a94cb05f..40f385e6a22 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -54,7 +54,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.partition.{AlterPartitionListener,
OngoingReassignmentState, PartitionListener, PendingShrinkIsr,
SimpleAssignmentState}
+import org.apache.kafka.server.partition.{AlterPartitionListener,
AlterPartitionManager, DefaultAlterPartitionManager, OngoingReassignmentState,
PartitionListener, PendingShrinkIsr, SimpleAssignmentState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
@@ -1270,7 +1270,7 @@ class PartitionTest extends AbstractPartitionTest {
// Expansion does not affect the ISR
assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr,
"ISR")
assertEquals(util.Set.of(leader, follower1, follower2),
partition.partitionState.maximalIsr, "ISR")
-
assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet,
+
assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr.asScala.toSet,
Set(leader, follower1, follower2), "AlterIsr")
}
@@ -1511,7 +1511,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(util.Set.of(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
- assertEquals(Set(brokerId, remoteBrokerId),
alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet)
+ assertEquals(Set(brokerId, remoteBrokerId),
alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr.asScala.toSet)
// Simulate invalid request failure
alterPartitionManager.failIsrUpdate(Errors.INVALID_REQUEST)
@@ -1566,7 +1566,7 @@ class PartitionTest extends AbstractPartitionTest {
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
- val isrItem = alterPartitionManager.isrUpdates.head
+ val isrItem = alterPartitionManager.isrUpdates.peek()
assertEquals(isrItem.leaderAndIsr.isr, util.Set.of[Integer](brokerId,
remoteBrokerId))
isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
// the broker epochs should be equal to broker epoch of the leader
@@ -1830,7 +1830,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Expansion succeeds.
- alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)
+ alterPartitionManager.completeIsrUpdate(1)
// ISR is committed.
assertEquals(replicas.toSet.asJava, partition.partitionState.isr)
@@ -1923,7 +1923,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(isr.toSet.asJava, partition.partitionState.isr)
assertEquals(replicas.toSet.asJava, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
- val isrUpdate = alterPartitionManager.isrUpdates.head
+ val isrUpdate = alterPartitionManager.isrUpdates.peek()
isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
if (brokerState.brokerId() == remoteBrokerId2) {
// remoteBrokerId2 has not received any fetch request yet, it does not
have broker epoch.
@@ -2085,7 +2085,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Expansion succeeds.
- alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1)
+ alterPartitionManager.completeIsrUpdate(1)
// ISR is committed.
assertEquals(replicas.toSet.asJava, partition.partitionState.isr)
@@ -2121,7 +2121,7 @@ class PartitionTest extends AbstractPartitionTest {
// Try to shrink the ISR
partition.maybeShrinkIsr()
assertEquals(alterPartitionManager.isrUpdates.size, 1)
- assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr,
util.Set.of[Integer](brokerId))
+ assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr,
util.Set.of[Integer](brokerId))
assertEquals(util.Set.of(brokerId, remoteBrokerId),
partition.partitionState.isr)
assertEquals(util.Set.of(brokerId, remoteBrokerId),
partition.partitionState.maximalIsr)
@@ -2136,7 +2136,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, partition.localLogOrException.highWatermark)
// The shrink succeeds after retrying
- alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+ alterPartitionManager.completeIsrUpdate(2)
assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
@@ -2206,8 +2206,8 @@ class PartitionTest extends AbstractPartitionTest {
partition.maybeShrinkIsr()
assertEquals(0, alterPartitionListener.shrinks.get)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
- assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr,
util.Set.of[Integer](brokerId, remoteBrokerId1))
- val isrUpdate = alterPartitionManager.isrUpdates.head
+ assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr,
util.Set.of[Integer](brokerId, remoteBrokerId1))
+ val isrUpdate = alterPartitionManager.isrUpdates.peek()
isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
assertEquals(defaultBrokerEpoch(brokerState.brokerId()),
brokerState.brokerEpoch())
}
@@ -2217,7 +2217,7 @@ class PartitionTest extends AbstractPartitionTest {
// After the ISR shrink completes, the ISR state should be updated and the
// high watermark should be advanced
- alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+ alterPartitionManager.completeIsrUpdate(2)
assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
@@ -2601,11 +2601,10 @@ class PartitionTest extends AbstractPartitionTest {
def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
val mockChannelManager = mock(classOf[NodeToControllerChannelManager])
val alterPartitionManager = new DefaultAlterPartitionManager(
- controllerChannelManager = mockChannelManager,
- scheduler = mock(classOf[KafkaScheduler]),
- time = time,
- brokerId = brokerId,
- brokerEpochSupplier = () => 0
+ mockChannelManager,
+ mock(classOf[KafkaScheduler]),
+ brokerId,
+ () => 0
)
partition = new Partition(topicPartition,
diff --git
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
deleted file mode 100644
index 4793723bc6a..00000000000
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ /dev/null
@@ -1,533 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Collections
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.{AuthenticationException,
OperationNotAttemptedException, UnknownServerException,
UnsupportedVersionException}
-import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
-import org.apache.kafka.common.message.{AlterPartitionRequestData,
AlterPartitionResponseData}
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.MessageUtil
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.RequestHeader
-import org.apache.kafka.common.requests.{AbstractRequest,
AlterPartitionRequest, AlterPartitionResponse}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
-import org.mockito.ArgumentMatcher
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, reset, times, verify}
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-
-class AlterPartitionManagerTest {
- val topicId = Uuid.randomUuid()
- val time = new MockTime
- val metrics = new Metrics
- val brokerId = 1
-
- var brokerToController: NodeToControllerChannelManager = _
-
- val tp0 = new TopicIdPartition(topicId, 0)
- val tp1 = new TopicIdPartition(topicId, 1)
- val tp2 = new TopicIdPartition(topicId, 2)
-
- @BeforeEach
- def setup(): Unit = {
- brokerToController = mock(classOf[NodeToControllerChannelManager])
- }
-
- @Test
- def testBasic(): Unit = {
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
- alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(any(), any())
- }
-
- @Test
- def testBasicWithBrokerEpoch(): Unit = {
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 101)
- alterPartitionManager.start()
- val isrWithBrokerEpoch = ListBuffer[BrokerState]()
- for (ii <- 1 to 3) {
- isrWithBrokerEpoch += new
BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii)
- }
- alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1,
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10))
-
- val expectedAlterPartitionData = new AlterPartitionRequestData()
- .setBrokerId(brokerId)
- .setBrokerEpoch(101)
- val topicData = new AlterPartitionRequestData.TopicData()
- .setTopicId(topicId)
-
- val newIsrWithBrokerEpoch = new ListBuffer[BrokerState]()
- newIsrWithBrokerEpoch.append(new
BrokerState().setBrokerId(1).setBrokerEpoch(101))
- newIsrWithBrokerEpoch.append(new
BrokerState().setBrokerId(2).setBrokerEpoch(102))
- newIsrWithBrokerEpoch.append(new
BrokerState().setBrokerId(3).setBrokerEpoch(103))
- topicData.partitions.add(new AlterPartitionRequestData.PartitionData()
- .setPartitionIndex(0)
- .setLeaderEpoch(1)
- .setPartitionEpoch(10)
- .setNewIsrWithEpochs(newIsrWithBrokerEpoch.toList.asJava))
-
- expectedAlterPartitionData.topics.add(topicData)
-
- verify(brokerToController).start()
- val captor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <:
AbstractRequest]])
- verify(brokerToController).sendRequest(captor.capture(), any())
- assertEquals(expectedAlterPartitionData,
captor.getValue.asInstanceOf[AlterPartitionRequest.Builder].build().data())
- }
-
- @ParameterizedTest
- @EnumSource(classOf[LeaderRecoveryState])
- def testBasicSentLeaderRecoveryState(leaderRecoveryState:
LeaderRecoveryState): Unit = {
- val requestCapture =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
- alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1,
List(1).map(Int.box).asJava, leaderRecoveryState, 10))
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(requestCapture.capture(), any())
-
- val request = requestCapture.getValue.build()
- assertEquals(leaderRecoveryState.value,
request.data.topics.get(0).partitions.get(0).leaderRecoveryState())
- }
-
- @Test
- def testOverwriteWithinBatch(): Unit = {
- val capture:
ArgumentCaptor[AbstractRequest.Builder[AlterPartitionRequest]] =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
- val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
-
- // Only send one ISR update for a given topic+partition
- val firstSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava,
LeaderRecoveryState.RECOVERED, 10))
- assertFalse(firstSubmitFuture.isDone)
-
- val failedSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava,
LeaderRecoveryState.RECOVERED, 10))
- assertTrue(failedSubmitFuture.isCompletedExceptionally)
- assertFutureThrows(classOf[OperationNotAttemptedException],
failedSubmitFuture)
-
- // Simulate response
- val alterPartitionResp = partitionResponse()
- val resp = makeClientResponse(
- response = alterPartitionResp,
- version = ApiKeys.ALTER_PARTITION.latestVersion
- )
- verify(brokerToController).sendRequest(capture.capture(),
callbackCapture.capture())
- callbackCapture.getValue.onComplete(resp)
-
- // Now we can submit this partition again
- val newSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED,
10))
- assertFalse(newSubmitFuture.isDone)
-
- verify(brokerToController).start()
- verify(brokerToController, times(2)).sendRequest(capture.capture(),
callbackCapture.capture())
-
- // Make sure we sent the right request ISR={1}
- val request = capture.getValue.build()
- assertEquals(request.data().topics().size(), 1)
- if (request.version() < 3) {
- assertEquals(request.data.topics.get(0).partitions.get(0).newIsr.size, 1)
- } else {
-
assertEquals(request.data.topics.get(0).partitions.get(0).newIsrWithEpochs.size,
1)
- }
- }
-
- @Test
- def testSingleBatch(): Unit = {
- val capture:
ArgumentCaptor[AbstractRequest.Builder[AlterPartitionRequest]] =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
- val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
-
- // First request will send batch of one
- alterPartitionManager.submit(new TopicIdPartition(topicId, 0),
- new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava,
LeaderRecoveryState.RECOVERED, 10))
-
- // Other submissions will queue up until a response
- for (i <- 1 to 9) {
- alterPartitionManager.submit(new TopicIdPartition(topicId, i),
- new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava,
LeaderRecoveryState.RECOVERED, 10))
- }
-
- // Simulate response, omitting partition 0 will allow it to stay in unsent
queue
- val alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData())
- val resp = new ClientResponse(null, null, "", 0L, 0L,
- false, null, null, alterPartitionResp)
-
- // On the callback, we check for unsent items and send another request
- verify(brokerToController).sendRequest(capture.capture(),
callbackCapture.capture())
- callbackCapture.getValue.onComplete(resp)
-
- verify(brokerToController).start()
- verify(brokerToController, times(2)).sendRequest(capture.capture(),
callbackCapture.capture())
-
- // Verify the last request sent had all 10 items
- val request = capture.getValue.build()
- assertEquals(request.data().topics().size(), 1)
- assertEquals(request.data().topics().get(0).partitions().size(), 10)
- }
-
- @Test
- def testSubmitFromCallback(): Unit = {
- // prepare a partition level retriable error response
- val alterPartitionRespWithPartitionError = partitionResponse(tp0,
Errors.UNKNOWN_SERVER_ERROR)
- val errorResponse =
makeClientResponse(alterPartitionRespWithPartitionError,
ApiKeys.ALTER_PARTITION.latestVersion)
-
- val leaderId = 1
- val leaderEpoch = 1
- val partitionEpoch = 10
- val isr = List(1, 2, 3)
- val leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch,
isr.map(Int.box).asJava, LeaderRecoveryState.RECOVERED, partitionEpoch)
- val callbackCapture =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
- val future = alterPartitionManager.submit(tp0, leaderAndIsr)
- val finalFuture = new CompletableFuture[LeaderAndIsr]()
- future.whenComplete { (_, e) =>
- if (e != null) {
- // Retry when error.
- alterPartitionManager.submit(tp0, leaderAndIsr).whenComplete {
(result, e) =>
- if (e != null) {
- finalFuture.completeExceptionally(e)
- } else {
- finalFuture.complete(result)
- }
- }
- } else {
- finalFuture.completeExceptionally(new AssertionError("Expected the
future to be failed"))
- }
- }
-
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
- reset(brokerToController)
- callbackCapture.getValue.onComplete(errorResponse)
-
- // Complete the retry request
- val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE,
partitionEpoch, leaderId, leaderEpoch, isr)
- val retryResponse = makeClientResponse(retryAlterPartitionResponse,
ApiKeys.ALTER_PARTITION.latestVersion)
-
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
- callbackCapture.getValue.onComplete(retryResponse)
-
- assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
- // No more items in unsentIsrUpdates
- assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
- }
-
- @Test
- def testAuthorizationFailed(): Unit = {
- testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
- }
-
- @Test
- def testStaleBrokerEpoch(): Unit = {
- testRetryOnTopLevelError(Errors.STALE_BROKER_EPOCH)
- }
-
- @Test
- def testUnknownServer(): Unit = {
- testRetryOnTopLevelError(Errors.UNKNOWN_SERVER_ERROR)
- }
-
- @Test
- def testRetryOnAuthenticationFailure(): Unit = {
- testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
- false, null, new AuthenticationException("authentication failed"), null))
- }
-
- @Test
- def testRetryOnUnsupportedVersionError(): Unit = {
- testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
- false, new UnsupportedVersionException("unsupported version"), null,
null))
- }
-
- private def testRetryOnTopLevelError(error: Errors): Unit = {
- val alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData().setErrorCode(error.code))
- val response = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion)
- testRetryOnErrorResponse(response)
- }
-
- private def testRetryOnErrorResponse(response: ClientResponse): Unit = {
- val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
- val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
- alterPartitionManager.submit(tp0, leaderAndIsr)
-
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
- callbackCapture.getValue.onComplete(response)
-
- // Any top-level error, we want to retry, so we don't clear items from the
pending map
- assertTrue(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
-
- reset(brokerToController)
-
- // After some time, we will retry failed requests
- time.sleep(100)
- scheduler.tick()
-
- // After a successful response, we can submit another AlterIsrItem
- val retryAlterPartitionResponse = partitionResponse()
- val retryResponse = makeClientResponse(retryAlterPartitionResponse,
ApiKeys.ALTER_PARTITION.latestVersion)
-
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
- callbackCapture.getValue.onComplete(retryResponse)
-
- assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
- }
-
- @Test
- def testInvalidUpdateVersion(): Unit = {
- checkPartitionError(Errors.INVALID_UPDATE_VERSION)
- }
-
- @Test
- def testUnknownTopicPartition(): Unit = {
- checkPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)
- }
-
- @Test
- def testNotLeaderOrFollower(): Unit = {
- checkPartitionError(Errors.NOT_LEADER_OR_FOLLOWER)
- }
-
- @Test
- def testInvalidRequest(): Unit = {
- checkPartitionError(Errors.INVALID_REQUEST)
- }
-
- private def checkPartitionError(error: Errors): Unit = {
- val alterPartitionManager = testPartitionError(tp0, error)
- // Any partition-level error should clear the item from the pending queue
allowing for future updates
- val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1,
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
- assertFalse(future.isDone)
- }
-
- private def testPartitionError(tp: TopicIdPartition, error: Errors):
AlterPartitionManager = {
- val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- reset(brokerToController)
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
-
- val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1,
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
- reset(brokerToController)
-
- val alterPartitionResp = partitionResponse(tp, error)
- val resp = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion)
- callbackCapture.getValue.onComplete(resp)
- assertTrue(future.isCompletedExceptionally)
- assertFutureThrows(error.exception.getClass, future)
- alterPartitionManager
- }
-
- @Test
- def testOneInFlight(): Unit = {
- val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- val scheduler = new MockScheduler(time)
- val alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, ()
=> 2)
- alterPartitionManager.start()
-
- // First submit will send the request
- alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
- // These will become pending unsent items
- alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
- alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
- verify(brokerToController).start()
- verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-
- // Once the callback runs, another request will be sent
- reset(brokerToController)
-
- val alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData())
- val resp = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion)
- callbackCapture.getValue.onComplete(resp)
- }
-
- @Test
- def testPartitionMissingInResponse(): Unit = {
- val expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion
- val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2,
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
- val brokerEpoch = 2
- val scheduler = new MockScheduler(time)
- val brokerToController =
Mockito.mock(classOf[NodeToControllerChannelManager])
- val alterPartitionManager = new DefaultAlterPartitionManager(
- brokerToController,
- scheduler,
- time,
- brokerId,
- () => brokerEpoch
- )
- alterPartitionManager.start()
-
- // The first `submit` will send the `AlterIsr` request
- val future1 = alterPartitionManager.submit(tp0, leaderAndIsr)
- val callback1 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
- expectedTopicPartitions = Set(tp0),
- expectedVersion = expectedVersion
- ))
-
- // Additional calls while the `AlterIsr` request is inflight will be queued
- val future2 = alterPartitionManager.submit(tp1, leaderAndIsr)
- val future3 = alterPartitionManager.submit(tp2, leaderAndIsr)
-
- // Respond to the first request, which will also allow the next request to
get sent
- callback1.onComplete(makeClientResponse(
- response = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR),
- version = expectedVersion
- ))
- assertFutureThrows(classOf[UnknownServerException], future1)
- assertFalse(future2.isDone)
- assertFalse(future3.isDone)
-
- // Verify the second request includes both expected partitions, but only
respond with one of them
- val callback2 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
- expectedTopicPartitions = Set(tp1, tp2),
- expectedVersion = expectedVersion
- ))
- callback2.onComplete(makeClientResponse(
- response = partitionResponse(tp2, Errors.UNKNOWN_SERVER_ERROR),
- version = expectedVersion
- ))
- assertFutureThrows(classOf[UnknownServerException], future3)
- assertFalse(future2.isDone)
-
- // The missing partition should be retried
- val callback3 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
- expectedTopicPartitions = Set(tp1),
- expectedVersion = expectedVersion
- ))
- callback3.onComplete(makeClientResponse(
- response = partitionResponse(tp1, Errors.UNKNOWN_SERVER_ERROR),
- version = expectedVersion
- ))
- assertFutureThrows(classOf[UnknownServerException], future2)
- }
-
- private def verifySendRequest(
- brokerToController: NodeToControllerChannelManager,
- expectedRequest: ArgumentMatcher[AbstractRequest.Builder[_ <:
AbstractRequest]]
- ): ControllerRequestCompletionHandler = {
- val callbackCapture =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
- Mockito.verify(brokerToController).sendRequest(
- ArgumentMatchers.argThat(expectedRequest),
- callbackCapture.capture()
- )
-
- Mockito.reset(brokerToController)
-
- callbackCapture.getValue
- }
-
- private def alterPartitionRequestMatcher(
- expectedTopicPartitions: Set[TopicIdPartition],
- expectedVersion: Short
- ): ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]] = {
- request => {
- assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey)
-
- val alterPartitionRequest =
request.asInstanceOf[AlterPartitionRequest.Builder].build()
- assertEquals(expectedVersion, alterPartitionRequest.version)
-
- val requestTopicIdPartitions =
alterPartitionRequest.data.topics.asScala.flatMap { topicData =>
- topicData.partitions.asScala.map { partitionData =>
- (topicData.topicId, partitionData.partitionIndex)
- }
- }.toSet
-
- expectedTopicPartitions.map(tp => (tp.topicId, tp.partitionId)) ==
requestTopicIdPartitions
- }
- }
-
- private def makeClientResponse(
- response: AlterPartitionResponse,
- version: Short
- ): ClientResponse = {
- new ClientResponse(
- new RequestHeader(response.apiKey, version, "", 0),
- null,
- "",
- 0L,
- 0L,
- false,
- null,
- null,
- // Response is serialized and deserialized to ensure that its does
- // not contain ignorable fields used by other versions.
-
AlterPartitionResponse.parse(MessageUtil.toByteBufferAccessor(response.data,
version), version)
- )
- }
-
- private def partitionResponse(
- tp: TopicIdPartition = tp0,
- error: Errors = Errors.NONE,
- partitionEpoch: Int = 0,
- leaderId: Int = 0,
- leaderEpoch: Int = 0,
- isr: List[Int] = List.empty
- ): AlterPartitionResponse = {
- new AlterPartitionResponse(new AlterPartitionResponseData()
- .setTopics(Collections.singletonList(
- new AlterPartitionResponseData.TopicData()
- .setTopicId(tp.topicId)
- .setPartitions(Collections.singletonList(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partitionId)
- .setPartitionEpoch(partitionEpoch)
- .setLeaderEpoch(leaderEpoch)
- .setLeaderId(leaderId)
- .setIsr(isr.map(Integer.valueOf).asJava)
- .setErrorCode(error.code))))))
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index ff729c79699..eba5042f0a7 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.internal.SimpleRecord
import org.apache.kafka.metadata.{KRaftMetadataCache, MockConfigRepository}
import org.apache.kafka.server.common.KRaftVersion
-import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
+import org.apache.kafka.server.util.{KafkaScheduler, MockTime,
MockAlterPartitionManager}
import org.apache.kafka.storage.internals.log.{CleanerConfig,
LogDirFailureChannel}
import java.util.Optional
@@ -50,7 +50,7 @@ class HighwatermarkPersistenceTest {
new LogDirFailureChannel(config.logDirs.size)
}
- val alterIsrManager = TestUtils.createAlterIsrManager()
+ val alterIsrManager = new MockAlterPartitionManager()
@AfterEach
def teardown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 43cdef2ffb1..2c1a5fdb5de 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -20,7 +20,6 @@ import java.util
import java.util.Properties
import kafka.cluster.Partition
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.TestUtils.MockAlterPartitionManager
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +28,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.{MockAlterPartitionManager, MockTime}
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel,
LogManager, LogOffsetMetadata, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -64,7 +63,7 @@ class IsrExpirationTest {
val logManager: LogManager = mock(classOf[LogManager])
when(logManager.liveLogDirs).thenReturn(util.List.of)
- alterIsrManager = TestUtils.createAlterIsrManager()
+ alterIsrManager = new MockAlterPartitionManager()
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "",
"")
replicaManager = new ReplicaManager(
metrics = metrics,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 3386a8eee86..0ed1a999d71 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.server.HostedPartition
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
TopicIdPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.partition.AlterPartitionManager
import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
@@ -489,6 +490,10 @@ class ReplicaManagerConcurrencyTest extends Logging {
): CompletableFuture[LeaderAndIsr] = {
channel.alterIsr(topicPartition, leaderAndIsr)
}
+
+ override def start(): Unit = {}
+
+ override def shutdown(): Unit = {}
}
private def registration(
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 3a5c3a6309f..fc4cfec1466 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
import org.apache.kafka.server.common.KRaftVersion
+import org.apache.kafka.server.partition.AlterPartitionManager
import org.apache.kafka.server.quota.ReplicaQuota
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 857084099ef..b38e433b292 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -68,6 +68,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets}
import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
+import org.apache.kafka.server.partition.AlterPartitionManager
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 660e1450013..a06aff1eed7 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.internal.RecordBatch
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.{MockAlterPartitionManager, MockTime}
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel,
LogManager, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -42,7 +42,7 @@ class OffsetsForLeaderEpochTest {
private val config =
TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps).head
private val time = new MockTime
private val metrics = new Metrics
- private val alterIsrManager = TestUtils.createAlterIsrManager()
+ private val alterIsrManager = new MockAlterPartitionManager()
private val tp = new TopicPartition("topic", 1)
private var replicaManager: ReplicaManager = _
private var quotaManager: QuotaManagers = _
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b2f12f30331..c0bc7618a57 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -29,13 +29,13 @@ import org.apache.kafka.common._
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBindingFilter}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{OperationNotAttemptedException,
TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{TopicExistsException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ConnectionMode,
ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests._
@@ -51,7 +51,6 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
Authorizer => JAuthorizer}
-import org.apache.kafka.server.common.TopicIdPartition
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs,
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
@@ -71,7 +70,6 @@ import java.nio.file.{Files, StandardOpenOption}
import java.time.Duration
import java.util
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Optional, Properties}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, mutable}
@@ -1004,52 +1002,6 @@ object TestUtils extends Logging {
logManager
}
- class MockAlterPartitionManager extends AlterPartitionManager {
- val isrUpdates: mutable.Queue[AlterPartitionItem] = new
mutable.Queue[AlterPartitionItem]()
- val inFlight: AtomicBoolean = new AtomicBoolean(false)
-
-
- override def submit(
- topicPartition: TopicIdPartition,
- leaderAndIsr: LeaderAndIsr,
- ): CompletableFuture[LeaderAndIsr]= {
- val future = new CompletableFuture[LeaderAndIsr]()
- if (inFlight.compareAndSet(false, true)) {
- isrUpdates += AlterPartitionItem(
- topicPartition,
- leaderAndIsr,
- future
- )
- } else {
- future.completeExceptionally(new OperationNotAttemptedException(
- s"Failed to enqueue AlterIsr request for $topicPartition since there
is already an inflight request"))
- }
- future
- }
-
- def completeIsrUpdate(newPartitionEpoch: Int): Unit = {
- if (inFlight.compareAndSet(true, false)) {
- val item = isrUpdates.dequeue()
-
item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))
- } else {
- fail("Expected an in-flight ISR update, but there was none")
- }
- }
-
- def failIsrUpdate(error: Errors): Unit = {
- if (inFlight.compareAndSet(true, false)) {
- val item = isrUpdates.dequeue()
- item.future.completeExceptionally(error.exception)
- } else {
- fail("Expected an in-flight ISR update, but there was none")
- }
- }
- }
-
- def createAlterIsrManager(): MockAlterPartitionManager = {
- new MockAlterPartitionManager()
- }
-
def generateAndProduceMessages[B <: KafkaBroker](
brokers: Seq[B],
topic: String,
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index bb4b8bc92e5..28f3577f4a5 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -18,7 +18,6 @@
package org.apache.kafka.jmh.fetcher;
import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
import kafka.server.BrokerBlockingSender;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
@@ -57,6 +56,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.quota.ReplicaQuota;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 0ee9cf4fb35..68dddd2c8d2 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.partition;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
import kafka.server.builders.LogManagerBuilder;
import org.apache.kafka.common.DirectoryId;
@@ -35,6 +34,7 @@ import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.partition.AlterPartitionListener;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
import org.apache.kafka.storage.internals.log.CleanerConfig;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index c4b41a0296f..4ece85a1f45 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.partition;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
import kafka.server.builders.LogManagerBuilder;
import org.apache.kafka.common.DirectoryId;
@@ -31,6 +30,7 @@ import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.partition.AlterPartitionListener;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.replica.Replica;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index ed2b34af5a1..480ce855481 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -17,7 +17,6 @@
package org.apache.kafka.jmh.server;
import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@@ -33,6 +32,7 @@ import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 1692e2a64d7..5d1ac018bd7 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -17,7 +17,6 @@
package org.apache.kafka.jmh.server;
import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@@ -36,6 +35,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.partition.AlterPartitionManager;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git
a/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
new file mode 100644
index 00000000000..b92b128d709
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+public record AlterPartitionItem(TopicIdPartition topicIdPartition,
LeaderAndIsr leaderAndIsr,
+ CompletableFuture<LeaderAndIsr> future) {
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
new file mode 100644
index 00000000000..076f8eccf85
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handles updating the ISR by sending AlterPartition requests to the
controller. Updating the ISR is an asynchronous
+ * operation, so partitions will learn about the result of their request
through a callback.
+ * <p>
+ * Note that ISR state changes can still be initiated by the controller and
sent to the partitions via LeaderAndIsr
+ * requests.
+ */
+public interface AlterPartitionManager {
+ void start();
+
+ void shutdown() throws InterruptedException;
+
+ CompletableFuture<LeaderAndIsr> submit(TopicIdPartition topicIdPartition,
LeaderAndIsr leaderAndIsr);
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
b/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
new file mode 100644
index 00000000000..dc43d1daee7
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.ControllerInformation;
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.util.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultAlterPartitionManager implements AlterPartitionManager {
+ private static final Logger log =
LoggerFactory.getLogger(DefaultAlterPartitionManager.class);
+ private final NodeToControllerChannelManager controllerChannelManager;
+ private final Scheduler scheduler;
+ private final int brokerId;
+ private final Supplier<Long> brokerEpochSupplier;
+
+ // Used to allow only one pending ISR update per partition (visible for
testing)
+ final ConcurrentHashMap<TopicIdPartition, AlterPartitionItem>
unsentIsrUpdates = new ConcurrentHashMap<>();
+
+ // Used to allow only one in-flight request at a time
+ private final AtomicBoolean inflightRequest = new AtomicBoolean(false);
+
+ public DefaultAlterPartitionManager(NodeToControllerChannelManager
controllerChannelManager, Scheduler scheduler, int brokerId, Supplier<Long>
brokerEpochSupplier) {
+ this.controllerChannelManager = controllerChannelManager;
+ this.scheduler = scheduler;
+ this.brokerId = brokerId;
+ this.brokerEpochSupplier = brokerEpochSupplier;
+ }
+
+ public static DefaultAlterPartitionManager create(AbstractKafkaConfig
config,
+ Scheduler scheduler,
+
Supplier<ControllerInformation> controllerNodeProvider,
+ Time time,
+ Metrics metrics,
+ String threadNamePrefix,
+ Supplier<Long>
brokerEpochSupplier) {
+ NodeToControllerChannelManager channelManager = new
NodeToControllerChannelManagerImpl(
+ controllerNodeProvider,
+ time,
+ metrics,
+ config,
+ "alter-partition",
+ threadNamePrefix,
+ Long.MAX_VALUE
+ );
+
+ return new DefaultAlterPartitionManager(channelManager, scheduler,
config.brokerId(), brokerEpochSupplier);
+ }
+
+ @Override
+ public void start() {
+ controllerChannelManager.start();
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ controllerChannelManager.shutdown();
+ }
+
+ @Override
+ public CompletableFuture<LeaderAndIsr> submit(TopicIdPartition
topicIdPartition,
+ LeaderAndIsr leaderAndIsr) {
+ CompletableFuture<LeaderAndIsr> future = new CompletableFuture<>();
+ AlterPartitionItem alterPartitionItem = new
AlterPartitionItem(topicIdPartition, leaderAndIsr, future);
+ boolean enqueued =
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition(),
alterPartitionItem) == null;
+ if (enqueued) {
+ maybePropagateIsrChanges();
+ } else {
+ future.completeExceptionally(new
OperationNotAttemptedException(String.format(
+ "Failed to enqueue ISR change state %s for partition %s",
leaderAndIsr, topicIdPartition)));
+ }
+ return future;
+ }
+
+ void maybePropagateIsrChanges() {
+ // Send all pending items if there is not already a request in-flight.
+ if (!unsentIsrUpdates.isEmpty() &&
inflightRequest.compareAndSet(false, true)) {
+ // Copy current unsent ISRs but don't remove from the map, they
get cleared in the response handler
+ List<AlterPartitionItem> inflightAlterPartitionItems = new
ArrayList<>(unsentIsrUpdates.values());
+ sendRequest(inflightAlterPartitionItems);
+ }
+ }
+
+ void clearInFlightRequest() {
+ if (!inflightRequest.compareAndSet(true, false)) {
+ log.warn("Attempting to clear AlterPartition in-flight flag when
no apparent request is in-flight");
+ }
+ }
+
+ private void sendRequest(List<AlterPartitionItem>
inflightAlterPartitionItems) {
+ long brokerEpoch = brokerEpochSupplier.get();
+ AlterPartitionRequest.Builder request =
buildRequest(inflightAlterPartitionItems, brokerEpoch);
+ log.debug("Sending AlterPartition to controller {}", request);
+
+ // We will not time out AlterPartition request, instead letting it
retry indefinitely
+ // until a response is received, or a new LeaderAndIsr overwrites the
existing isrState
+ // which causes the response for those partitions to be ignored.
+ controllerChannelManager.sendRequest(request,
+ new ControllerRequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse response) {
+ log.debug("Received AlterPartition response {}",
response);
+ Errors error;
+ try {
+ if (response.authenticationException() != null) {
+ // For now, we treat authentication errors as
retriable. We use the
+ // `NETWORK_EXCEPTION` error code for lack of
a good alternative.
+ // Note that `NodeToControllerChannelManager`
will still log the
+ // authentication errors so that users have a
chance to fix the problem.
+ error = Errors.NETWORK_EXCEPTION;
+ } else if (response.versionMismatch() != null) {
+ error = Errors.UNSUPPORTED_VERSION;
+ } else {
+ error = handleAlterPartitionResponse(
+ (AlterPartitionResponse)
response.responseBody(),
+ brokerEpoch,
+ inflightAlterPartitionItems
+ );
+ }
+ } finally {
+ // clear the flag so future requests can proceed
+ clearInFlightRequest();
+ }
+
+ // check if we need to send another request right away
+ if (error == Errors.NONE) {
+ // In the normal case, check for pending updates
to send immediately
+ maybePropagateIsrChanges();
+ } else {
+ // If we received a top-level error from the
controller, retry the request in the near future
+ scheduler.scheduleOnce("send-alter-partition", ()
-> maybePropagateIsrChanges(), 50);
+ }
+ }
+
+ @Override
+ public void onTimeout() {
+ throw new IllegalStateException("Encountered
unexpected timeout when sending AlterPartition to the controller");
+ }
+ });
+ }
+
+ /**
+ * Builds an AlterPartition request.
+ * <p>
+ * While building the request, we don't know which version of the
AlterPartition API is
+ * supported by the controller. The final decision is taken when the
AlterPartitionRequest
+ * is built in the network client based on the advertised api versions of
the controller.
+ *
+ * @return an AlterPartitionRequest.Builder with the provided parameters.
+ */
+ private AlterPartitionRequest.Builder buildRequest(
+ List<AlterPartitionItem> inflightAlterPartitionItems,
+ long brokerEpoch) {
+ AlterPartitionRequestData message = new AlterPartitionRequestData()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(brokerEpoch);
+
+ inflightAlterPartitionItems.stream()
+ .collect(Collectors.groupingBy(item ->
item.topicIdPartition().topicId()))
+ .forEach((topicId, items) -> {
+ AlterPartitionRequestData.TopicData topicData = new
AlterPartitionRequestData.TopicData().setTopicId(topicId);
+ message.topics().add(topicData);
+
+ items.forEach(item -> {
+ AlterPartitionRequestData.PartitionData partitionData =
+ new AlterPartitionRequestData.PartitionData()
+
.setPartitionIndex(item.topicIdPartition().partitionId())
+
.setLeaderEpoch(item.leaderAndIsr().leaderEpoch())
+
.setNewIsrWithEpochs(item.leaderAndIsr().isrWithBrokerEpoch())
+
.setPartitionEpoch(item.leaderAndIsr().partitionEpoch());
+
+
partitionData.setLeaderRecoveryState(item.leaderAndIsr().leaderRecoveryState().value());
+
+ topicData.partitions().add(partitionData);
+ });
+
+ });
+
+ return new AlterPartitionRequest.Builder(message);
+ }
+
+ private Errors handleAlterPartitionResponse(AlterPartitionResponse
alterPartitionResponse,
+ long sentBrokerEpoch,
+ List<AlterPartitionItem>
inflightAlterPartitionItems) {
+ AlterPartitionResponseData data = alterPartitionResponse.data();
+ Errors error = Errors.forCode(data.errorCode());
+ switch (error) {
+ case STALE_BROKER_EPOCH -> log.warn("Broker had a stale broker
epoch ({}), retrying.", sentBrokerEpoch);
+ case CLUSTER_AUTHORIZATION_FAILED -> log.error("Broker is not
authorized to send AlterPartition to controller",
+ Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is
not authorized to send AlterPartition to controller"));
+ case NONE -> {
+ // Collect partition-level responses to pass to the callbacks
+ Map<TopicIdPartition, LeaderAndIsr> successResponses = new
HashMap<>();
+ Map<TopicIdPartition, Errors> errorResponses = new HashMap<>();
+ data.topics().forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ TopicIdPartition tp = new
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+ Errors apiError =
Errors.forCode(partition.errorCode());
+ log.debug("Controller successfully handled
AlterPartition request for {}: {}", tp, partition);
+ if (apiError == Errors.NONE) {
+ Optional<LeaderRecoveryState>
leaderRecoveryStateOpt =
+
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState());
+ if (leaderRecoveryStateOpt.isPresent()) {
+ successResponses.put(tp, new LeaderAndIsr(
+ partition.leaderId(),
+ partition.leaderEpoch(),
+ partition.isr(),
+ leaderRecoveryStateOpt.get(),
+ partition.partitionEpoch()
+ ));
+ } else {
+ log.error("Controller returned an invalid
leader recovery state ({}) for {}: {}", partition.leaderRecoveryState(), tp,
partition);
+ errorResponses.put(tp,
Errors.UNKNOWN_SERVER_ERROR);
+ }
+ } else {
+ errorResponses.put(tp, apiError);
+ }
+ }
+ ));
+ // Iterate across the items we sent rather than what we
received to ensure we run the callback even if a
+ // partition was somehow erroneously excluded from the
response. Note that these callbacks are run from
+ // the leaderIsrUpdateLock write lock in
Partition#sendAlterPartitionRequest
+ inflightAlterPartitionItems.forEach(inflightAlterPartition -> {
+ if
(successResponses.containsKey(inflightAlterPartition.topicIdPartition())) {
+ // Regardless of callback outcome, we need to clear
from the unsent updates map to unblock further
+ // updates. We clear it now to allow the callback to
submit a new update if needed.
+
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition());
+
inflightAlterPartition.future().complete(successResponses.get(inflightAlterPartition.topicIdPartition()));
+ } else if
(errorResponses.containsKey(inflightAlterPartition.topicIdPartition())) {
+
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition());
+
inflightAlterPartition.future().completeExceptionally(errorResponses.get(inflightAlterPartition.topicIdPartition()).exception());
+ } else {
+ // Don't remove this partition from the update map so
it will get re-sent
+ log.warn("Partition {} was sent but not included in
the response", inflightAlterPartition.topicIdPartition());
+ }
+ });
+ }
+ default -> log.warn("Controller returned an unexpected top-level
error when handling AlterPartition request: {}", error);
+ }
+
+ return error;
+ }
+
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
b/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
new file mode 100644
index 00000000000..e30a7702ab5
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.util.MockScheduler;
+import org.apache.kafka.server.util.MockTime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AlterPartitionManagerTest {
+ private final Uuid topicId = Uuid.randomUuid();
+ private final MockTime time = new MockTime();
+ private final int brokerId = 1;
+
+ private NodeToControllerChannelManager brokerToController;
+
+ private final TopicIdPartition tp0 = new TopicIdPartition(topicId, 0);
+ private final TopicIdPartition tp1 = new TopicIdPartition(topicId, 1);
+ private final TopicIdPartition tp2 = new TopicIdPartition(topicId, 2);
+
+ @BeforeEach
+ public void setup() {
+ brokerToController = mock(NodeToControllerChannelManager.class);
+ }
+
+ @Test
+ public void testBasic() {
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+ alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1, 2,
3), LeaderRecoveryState.RECOVERED, 10));
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(any(), any());
+ }
+
+ @Test
+ public void testBasicWithBrokerEpoch() {
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () ->
101L);
+ alterPartitionManager.start();
+ ArrayList<BrokerState> isrWithBrokerEpoch = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ isrWithBrokerEpoch.add(new
BrokerState().setBrokerId(i).setBrokerEpoch(100 + i));
+ }
+ alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1,
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch, 10));
+
+ var expectedAlterPartitionData = new AlterPartitionRequestData()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(101);
+ var topicData = new
AlterPartitionRequestData.TopicData().setTopicId(topicId);
+
+ ArrayList<BrokerState> newIsrWithBrokerEpoch = new ArrayList<>();
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(1).setBrokerEpoch(101));
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(2).setBrokerEpoch(102));
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(3).setBrokerEpoch(103));
+
+ topicData.partitions().add(new
AlterPartitionRequestData.PartitionData()
+ .setPartitionIndex(0)
+ .setLeaderEpoch(1)
+ .setPartitionEpoch(10)
+ .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+ expectedAlterPartitionData.topics().add(topicData);
+
+ verify(brokerToController).start();
+ ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> captor
= ArgumentCaptor.captor();
+ verify(brokerToController).sendRequest(captor.capture(), any());
+ AlterPartitionRequest.Builder builder =
(AlterPartitionRequest.Builder) captor.getValue();
+ assertEquals(expectedAlterPartitionData, builder.build().data());
+ }
+
+ @ParameterizedTest
+ @EnumSource(LeaderRecoveryState.class)
+ public void testBasicSentLeaderRecoveryState(LeaderRecoveryState
leaderRecoveryState) {
+ ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>>
requestCapture = ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+ alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1),
leaderRecoveryState, 10));
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(requestCapture.capture(),
any());
+
+ var request = requestCapture.getValue().build();
+ assertEquals(leaderRecoveryState.value(),
request.data().topics().get(0).partitions().get(0).leaderRecoveryState());
+ }
+
+ @Test
+ public void testOverwriteWithinBatch() {
+ ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> capture
= ArgumentCaptor.captor();
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+
+ // Only send one ISR update for a given topic+partition
+ var firstSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+ assertFalse(firstSubmitFuture.isDone());
+
+ var failedSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List.of(1, 2), LeaderRecoveryState.RECOVERED, 10));
+ assertTrue(failedSubmitFuture.isCompletedExceptionally());
+ assertFutureThrows(OperationNotAttemptedException.class,
failedSubmitFuture);
+
+ // Simulate response
+ var alterPartitionResp = partitionResponse(tp0, Errors.NONE, 0, 0, 0,
List.of());
+ var resp = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion());
+
+ verify(brokerToController).sendRequest(capture.capture(),
callbackCapture.capture());
+ callbackCapture.getValue().onComplete(resp);
+
+ // Now we can submit this partition again
+ var newSubmitFuture = alterPartitionManager.submit(tp0, new
LeaderAndIsr(1, 1, List.of(1), LeaderRecoveryState.RECOVERED, 10));
+ assertFalse(newSubmitFuture.isDone());
+
+ verify(brokerToController).start();
+ verify(brokerToController, times(2)).sendRequest(capture.capture(),
callbackCapture.capture());
+
+ // Make sure we sent the right request ISR={1}
+ var request = capture.getValue().build();
+ assertEquals(1, request.data().topics().size());
+ if (request.version() < 3) {
+ assertEquals(1,
request.data().topics().get(0).partitions().get(0).newIsr().size());
+ } else {
+ assertEquals(1,
request.data().topics().get(0).partitions().get(0).newIsrWithEpochs().size());
+ }
+ }
+
+ @Test
+ public void testSingleBatch() {
+ ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> capture
= ArgumentCaptor.captor();
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+
+ // First request will send batch of one
+ alterPartitionManager.submit(new TopicIdPartition(topicId, 0),
+ new LeaderAndIsr(1, 1, List.of(1, 2, 3),
LeaderRecoveryState.RECOVERED, 10));
+
+ // Other submissions will queue up until a response
+ for (int i = 1; i <= 9; i++) {
+ alterPartitionManager.submit(new TopicIdPartition(topicId, i),
+ new LeaderAndIsr(1, 1, List.of(1, 2, 3),
LeaderRecoveryState.RECOVERED, 10));
+ }
+
+ // Simulate response, omitting partition 0 will allow it to stay in
unsent queue
+ var alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData());
+ var resp = new ClientResponse(null, null, "", 0L, 0L,
+ false, null, null, alterPartitionResp);
+
+ // On the callback, we check for unsent items and send another request
+ verify(brokerToController).sendRequest(capture.capture(),
callbackCapture.capture());
+ callbackCapture.getValue().onComplete(resp);
+
+ verify(brokerToController).start();
+ verify(brokerToController, times(2)).sendRequest(capture.capture(),
callbackCapture.capture());
+
+ // Verify the last request sent had all 10 items
+ var request = capture.getValue().build();
+ assertEquals(1, request.data().topics().size());
+ assertEquals(10, request.data().topics().get(0).partitions().size());
+ }
+
+ @Test
+ public void testSubmitFromCallback() throws ExecutionException,
InterruptedException, TimeoutException {
+ // prepare a partition level retriable error response
+ var alterPartitionRespWithPartitionError = partitionResponse(tp0,
Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0, List.of());
+ var errorResponse =
makeClientResponse(alterPartitionRespWithPartitionError,
ApiKeys.ALTER_PARTITION.latestVersion());
+
+ var leaderId = 1;
+ var leaderEpoch = 1;
+ var partitionEpoch = 10;
+ var isr = List.of(1, 2, 3);
+ var leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch, isr,
LeaderRecoveryState.RECOVERED, partitionEpoch);
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+ var future = alterPartitionManager.submit(tp0, leaderAndIsr);
+ var finalFuture = new CompletableFuture<LeaderAndIsr>();
+ future.whenComplete((result, error) -> {
+ if (error != null) {
+ // Retry when error.
+ alterPartitionManager.submit(tp0,
leaderAndIsr).whenComplete((retryResult, retryError) -> {
+ if (retryError != null) {
+ finalFuture.completeExceptionally(retryError);
+ } else {
+ finalFuture.complete(retryResult);
+ }
+ });
+ } else {
+ finalFuture.completeExceptionally(new AssertionError("Expected
to future to be failed"));
+ }
+ });
+
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+ reset(brokerToController);
+ callbackCapture.getValue().onComplete(errorResponse);
+
+ // Complete the retry request
+ var retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE,
partitionEpoch, leaderId, leaderEpoch, isr);
+ var retryResponse = makeClientResponse(retryAlterPartitionResponse,
ApiKeys.ALTER_PARTITION.latestVersion());
+
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+ callbackCapture.getValue().onComplete(retryResponse);
+
+ assertEquals(leaderAndIsr, finalFuture.get(200,
TimeUnit.MILLISECONDS));
+ // No more items in unsentIsrUpdates
+ assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+
+ }
+
+ @Test
+ public void testAuthorizationFailed() {
+ testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED);
+ }
+
+ @Test
+ public void testStaleBrokerEpoch() {
+ testRetryOnTopLevelError(Errors.STALE_BROKER_EPOCH);
+ }
+
+ @Test
+ public void testUnknownServer() {
+ testRetryOnTopLevelError(Errors.UNKNOWN_SERVER_ERROR);
+ }
+
+ @Test
+ public void testRetryOnAuthenticationFailure() {
+ testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+ false, null, new AuthenticationException("authentication
failed"), null));
+ }
+
+ @Test
+ public void testRetryOnUnsupportedVersionError() {
+ testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+ false, new UnsupportedVersionException("unsupported version"),
null, null));
+ }
+
+ private void testRetryOnTopLevelError(Errors error) {
+ var alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData().setErrorCode(error.code()));
+ var response = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion());
+ testRetryOnErrorResponse(response);
+ }
+
+ private void testRetryOnErrorResponse(ClientResponse response) {
+ var leaderAndIsr = new LeaderAndIsr(1, 1, List.of(1, 2, 3),
LeaderRecoveryState.RECOVERED, 10);
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+ alterPartitionManager.submit(tp0, leaderAndIsr);
+
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+ callbackCapture.getValue().onComplete(response);
+
+ // Any top-level error, we want to retry, so we don't clear items from
the pending map
+ assertTrue(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+
+ reset(brokerToController);
+
+ // After some time, we will retry failed requests
+ time.sleep(100);
+ scheduler.tick();
+
+ // After a successful response, we can submit another AlterIsrItem
+ var retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE,
0, 0, 0, List.of());
+ var retryResponse = makeClientResponse(retryAlterPartitionResponse,
ApiKeys.ALTER_PARTITION.latestVersion());
+
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+ callbackCapture.getValue().onComplete(retryResponse);
+
+ assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+ }
+
+ @Test
+ public void testInvalidUpdateVersion() {
+ checkPartitionError(Errors.INVALID_UPDATE_VERSION);
+ }
+
+ @Test
+ public void testUnknownTopicPartition() {
+ checkPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ }
+
+ @Test
+ public void testNotLeaderOrFollower() {
+ checkPartitionError(Errors.NOT_LEADER_OR_FOLLOWER);
+ }
+
+ @Test
+ public void testInvalidRequest() {
+ checkPartitionError(Errors.INVALID_REQUEST);
+ }
+
+ private void checkPartitionError(Errors error) {
+ var alterPartitionManager = testPartitionError(tp0, error);
+ var future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1,
List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+ assertFalse(future.isDone());
+ }
+
+ private AlterPartitionManager testPartitionError(TopicIdPartition tp,
Errors error) {
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+ reset(brokerToController);
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+
+ var future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1,
List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+ reset(brokerToController);
+
+ var alterPartitionResp = partitionResponse(tp, error, 0, 0, 0,
List.of());
+ var resp = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion());
+ callbackCapture.getValue().onComplete(resp);
+ assertTrue(future.isCompletedExceptionally());
+ assertFutureThrows(error.exception().getClass(), future);
+ return alterPartitionManager;
+ }
+
+ @Test
+ public void testOneInFlight() {
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+
+ // First submit will send the request
+ alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1, 2,
3), LeaderRecoveryState.RECOVERED, 10));
+
+ // These will become pending unsent items
+ alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List.of(1, 2,
3), LeaderRecoveryState.RECOVERED, 10));
+ alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List.of(1, 2,
3), LeaderRecoveryState.RECOVERED, 10));
+
+ verify(brokerToController).start();
+ verify(brokerToController).sendRequest(any(),
callbackCapture.capture());
+
+ // Once the callback runs, another request will be sent
+ reset(brokerToController);
+
+ var alterPartitionResp = new AlterPartitionResponse(new
AlterPartitionResponseData());
+ var resp = makeClientResponse(alterPartitionResp,
ApiKeys.ALTER_PARTITION.latestVersion());
+ callbackCapture.getValue().onComplete(resp);
+
+ // Verify that pending items (tp1, tp2) are sent after the callback
+ verify(brokerToController).sendRequest(any(), any());
+ }
+
+ @Test
+ public void testPartitionMissingInResponse() {
+ var expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion();
+ var leaderAndIsr = new LeaderAndIsr(1, 1, List.of(1, 2, 3),
LeaderRecoveryState.RECOVERED, 10);
+ var scheduler = new MockScheduler(time);
+ var alterPartitionManager = new
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+ alterPartitionManager.start();
+
+ // The first `submit` will send the `AlterIsr` request
+ var future1 = alterPartitionManager.submit(tp0, leaderAndIsr);
+ var callback1 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
+ Set.of(tp0),
+ expectedVersion
+ ));
+
+ // Additional calls while the `AlterIsr` request is inflight will be
queued
+ var future2 = alterPartitionManager.submit(tp1, leaderAndIsr);
+ var future3 = alterPartitionManager.submit(tp2, leaderAndIsr);
+
+ // Respond to the first request, which will also allow the next
request to get sent
+ callback1.onComplete(makeClientResponse(
+ partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0,
List.of()),
+ expectedVersion
+ ));
+ assertFutureThrows(UnknownServerException.class, future1);
+ assertFalse(future2.isDone());
+ assertFalse(future3.isDone());
+
+ // Verify the second request includes both expected partitions, but
only respond with one of them
+ var callback2 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
+ Set.of(tp1, tp2),
+ expectedVersion
+ ));
+ callback2.onComplete(makeClientResponse(
+ partitionResponse(tp2, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0,
List.of()),
+ expectedVersion
+ ));
+ assertFutureThrows(UnknownServerException.class, future3);
+ assertFalse(future2.isDone());
+
+ // The missing partition should be retried
+ var callback3 = verifySendRequest(brokerToController,
alterPartitionRequestMatcher(
+ Set.of(tp1),
+ expectedVersion
+ ));
+ callback3.onComplete(makeClientResponse(
+ partitionResponse(tp1, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0,
List.of()),
+ expectedVersion
+ ));
+ assertFutureThrows(UnknownServerException.class, future2);
+
+ }
+
+ private ControllerRequestCompletionHandler
verifySendRequest(NodeToControllerChannelManager brokerToController,
+
ArgumentMatcher<AbstractRequest.Builder<? extends AbstractRequest>>
expectedRequest) {
+ ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture =
ArgumentCaptor.captor();
+
+ verify(brokerToController).sendRequest(argThat(expectedRequest),
callbackCapture.capture());
+
+ reset(brokerToController);
+
+ return callbackCapture.getValue();
+ }
+
+ private ArgumentMatcher<AbstractRequest.Builder<? extends
AbstractRequest>> alterPartitionRequestMatcher(Set<TopicIdPartition>
expectedTopicPartitions,
+
Short expectedVersion) {
+ return request -> {
+ assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey());
+
+ var alterPartitionRequest = ((AlterPartitionRequest.Builder)
request).build();
+ assertEquals(expectedVersion, alterPartitionRequest.version());
+
+ var requestTopicPartitions =
alterPartitionRequest.data().topics().stream()
+ .flatMap(topicData ->
+ topicData.partitions().stream()
+ .map(partitionData ->
+ new
TopicPartitionKey(topicData.topicId(), partitionData.partitionIndex()))
+ ).collect(Collectors.toSet());
+
+ var expectedSet = expectedTopicPartitions.stream()
+ .map(tp -> new TopicPartitionKey(tp.topicId(),
tp.partitionId()))
+ .collect(Collectors.toSet());
+
+ return expectedSet.equals(requestTopicPartitions);
+ };
+ }
+
+ record TopicPartitionKey(Uuid topicId, int partitionId) { }
+
+ private ClientResponse makeClientResponse(AlterPartitionResponse response,
short version) {
+ return new ClientResponse(
+ new RequestHeader(response.apiKey(), version, "", 0),
+ null,
+ "",
+ 0L,
+ 0L,
+ false,
+ null,
+ null,
+ // Response is serialized and deserialized to ensure that its
does
+ // not contain ignorable fields used by other versions.
+
AlterPartitionResponse.parse(MessageUtil.toByteBufferAccessor(response.data(),
version), version)
+ );
+ }
+
+ private AlterPartitionResponse partitionResponse(
+ TopicIdPartition tp,
+ Errors error,
+ int partitionEpoch,
+ int leaderId,
+ int leaderEpoch,
+ List<Integer> isr) {
+ return new AlterPartitionResponse(new AlterPartitionResponseData()
+ .setTopics(List.of(new AlterPartitionResponseData.TopicData()
+ .setTopicId(tp.topicId())
+ .setPartitions(List.of(new
AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partitionId())
+ .setPartitionEpoch(partitionEpoch)
+ .setLeaderEpoch(leaderEpoch)
+ .setLeaderId(leaderId)
+ .setIsr(isr)
+ .setErrorCode(error.code()))))));
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
b/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
new file mode 100644
index 00000000000..a4d4d07697d
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.partition.AlterPartitionItem;
+import org.apache.kafka.server.partition.AlterPartitionManager;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class MockAlterPartitionManager implements AlterPartitionManager {
+ // Visible for testing
+ public final Deque<AlterPartitionItem> isrUpdates = new ArrayDeque<>();
+ private final AtomicBoolean inFlight = new AtomicBoolean(false);
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+
+ }
+
+ @Override
+ public CompletableFuture<LeaderAndIsr> submit(TopicIdPartition
topicIdPartition, LeaderAndIsr leaderAndIsr) {
+ var future = new CompletableFuture<LeaderAndIsr>();
+ if (inFlight.compareAndSet(false, true)) {
+ isrUpdates.add(new AlterPartitionItem(topicIdPartition,
leaderAndIsr, future));
+ } else {
+ future.completeExceptionally(new OperationNotAttemptedException(
+ String.format("Failed to enqueue AlterIsr request for %s
since there is already an inflight request",
+ topicIdPartition)
+ ));
+ }
+ return future;
+ }
+
+ public void completeIsrUpdate(int newPartitionEpoch) {
+ if (inFlight.compareAndSet(true, false)) {
+ var item = isrUpdates.poll();
+
item.future().complete(item.leaderAndIsr().withPartitionEpoch(newPartitionEpoch));
+ } else {
+ fail("Expected an in-flight ISR update, but there was none");
+ }
+ }
+
+ public void failIsrUpdate(Errors error) {
+ if (inFlight.compareAndSet(true, false)) {
+ var item = isrUpdates.poll();
+ item.future().completeExceptionally(error.exception());
+ } else {
+ fail("Expected an in-flight ISR update, but there was none");
+ }
+ }
+}