This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0e77d16260 KAFKA-19914: Move AlterPartitionManager to server module 
(#21432)
c0e77d16260 is described below

commit c0e77d1626078c8e0b7c3021cb2ecae2ed325e11
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Fri Apr 10 18:41:21 2026 +0800

    KAFKA-19914: Move AlterPartitionManager to server module (#21432)
    
    Migrate AlterPartitionManager from Scala to Java in the
    `org.apache.kafka.server.partition` package, including the interface,
    `DefaultAlterPartitionManager` implementation, and `AlterPartitionItem`
    record. Also migrate `AlterPartitionManagerTest` and move
    `MockAlterPartitionManager` from `TestUtils.scala` to
    `org.apache.kafka.server.util`.
    
    Reviewers: Mickael Maison <[email protected]>
---
 build.gradle                                       |   1 +
 .../server/builders/ReplicaManagerBuilder.java     |   2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../scala/kafka/server/AlterPartitionManager.scala | 308 ------------
 .../src/main/scala/kafka/server/BrokerServer.scala |   9 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../kafka/server/LocalLeaderEndPointTest.scala     |   1 +
 .../unit/kafka/cluster/AbstractPartitionTest.scala |   4 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |   3 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  33 +-
 .../kafka/server/AlterPartitionManagerTest.scala   | 533 --------------------
 .../server/HighwatermarkPersistenceTest.scala      |   4 +-
 .../unit/kafka/server/IsrExpirationTest.scala      |   5 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   5 +
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   1 +
 .../unit/kafka/server/ReplicaManagerTest.scala     |   1 +
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  52 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |   2 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   2 +-
 .../kafka/server/partition/AlterPartitionItem.java |  27 +
 .../server/partition/AlterPartitionManager.java    |  38 ++
 .../partition/DefaultAlterPartitionManager.java    | 287 +++++++++++
 .../partition/AlterPartitionManagerTest.java       | 543 +++++++++++++++++++++
 .../server/util/MockAlterPartitionManager.java     |  80 +++
 28 files changed, 1024 insertions(+), 931 deletions(-)

diff --git a/build.gradle b/build.gradle
index 36edce22f3f..6570cd7bbb2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3446,6 +3446,7 @@ project(':jmh-benchmarks') {
     implementation project(':connect:json')
     implementation project(':clients').sourceSets.test.output
     implementation project(':server-common').sourceSets.test.output
+    implementation project(':server').sourceSets.test.output
     implementation project(':metadata').sourceSets.test.output
 
     implementation libs.jmhCore
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 1eecd995024..561447d9f97 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -17,7 +17,6 @@
 
 package kafka.server.builders;
 
-import kafka.server.AlterPartitionManager;
 import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory.QuotaManagers;
 import kafka.server.ReplicaManager;
@@ -27,6 +26,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.DelayedActionQueue;
 import org.apache.kafka.server.common.DirectoryEventHandler;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.internals.log.LogManager;
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index f303b0fede7..49cd785355b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -43,7 +43,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.storage.internals.log.{AppendOrigin, 
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, 
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, 
VerificationGuard}
 import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.partition.{AlterPartitionListener, 
AssignmentState, CommittedPartitionState, OngoingReassignmentState, 
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, 
PendingShrinkIsr, SimpleAssignmentState}
+import org.apache.kafka.server.partition.{AlterPartitionListener, 
AlterPartitionManager, AssignmentState, CommittedPartitionState, 
OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, 
PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState}
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
 import org.apache.kafka.server.replica.Replica
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
deleted file mode 100644
index 3b060a35d8f..00000000000
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
-import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.errors.OperationNotAttemptedException
-import org.apache.kafka.common.message.AlterPartitionRequestData
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AlterPartitionRequest, 
AlterPartitionResponse}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.server.{ControllerInformation, 
NodeToControllerChannelManagerImpl}
-
-import java.util.function.Supplier
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-import scala.jdk.OptionConverters.RichOptional
-
-/**
- * Handles updating the ISR by sending AlterPartition requests to the 
controller. Updating the ISR is an asynchronous 
- * operation, so partitions will learn about the result of their request 
through a callback.
- *
- * Note that ISR state changes can still be initiated by the controller and 
sent to the partitions via LeaderAndIsr
- * requests.
- */
-trait AlterPartitionManager {
-  def start(): Unit = {}
-
-  def shutdown(): Unit = {}
-
-  def submit(
-    topicIdPartition: TopicIdPartition,
-    leaderAndIsr: LeaderAndIsr
-  ): CompletableFuture[LeaderAndIsr]
-}
-
-case class AlterPartitionItem(
-  topicIdPartition: TopicIdPartition,
-  leaderAndIsr: LeaderAndIsr,
-  future: CompletableFuture[LeaderAndIsr]
-)
-
-object AlterPartitionManager {
-
-  /**
-   * Factory to AlterPartition based implementation
-   */
-  def apply(
-    config: KafkaConfig,
-    scheduler: Scheduler,
-    controllerNodeProvider: Supplier[ControllerInformation],
-    time: Time,
-    metrics: Metrics,
-    threadNamePrefix: String,
-    brokerEpochSupplier: () => Long,
-  ): AlterPartitionManager = {
-    val channelManager = new NodeToControllerChannelManagerImpl(
-      controllerNodeProvider,
-      time,
-      metrics,
-      config,
-      "alter-partition",
-      threadNamePrefix,
-      Long.MaxValue
-    )
-    new DefaultAlterPartitionManager(
-      controllerChannelManager = channelManager,
-      scheduler = scheduler,
-      time = time,
-      brokerId = config.brokerId,
-      brokerEpochSupplier = brokerEpochSupplier
-    )
-  }
-}
-
-class DefaultAlterPartitionManager(
-  val controllerChannelManager: NodeToControllerChannelManager,
-  val scheduler: Scheduler,
-  val time: Time,
-  val brokerId: Int,
-  val brokerEpochSupplier: () => Long,
-) extends AlterPartitionManager with Logging {
-
-  // Used to allow only one pending ISR update per partition (visible for 
testing)
-  private[server] val unsentIsrUpdates = new 
ConcurrentHashMap[TopicIdPartition, AlterPartitionItem]()
-
-  // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
-
-  override def start(): Unit = {
-    controllerChannelManager.start()
-  }
-
-  override def shutdown(): Unit = {
-    controllerChannelManager.shutdown()
-  }
-
-  override def submit(
-    topicIdPartition: TopicIdPartition,
-    leaderAndIsr: LeaderAndIsr
-  ): CompletableFuture[LeaderAndIsr] = {
-    val future = new CompletableFuture[LeaderAndIsr]()
-    val alterPartitionItem = AlterPartitionItem(topicIdPartition, 
leaderAndIsr, future)
-    val enqueued = 
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition, 
alterPartitionItem) == null
-    if (enqueued) {
-      maybePropagateIsrChanges()
-    } else {
-      future.completeExceptionally(new OperationNotAttemptedException(
-        s"Failed to enqueue ISR change state $leaderAndIsr for partition 
$topicIdPartition"))
-    }
-    future
-  }
-
-  private[server] def maybePropagateIsrChanges(): Unit = {
-    // Send all pending items if there is not already a request in-flight.
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map, they get 
cleared in the response handler
-      val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()
-      unsentIsrUpdates.values.forEach(item => 
inflightAlterPartitionItems.append(item))
-      sendRequest(inflightAlterPartitionItems.toSeq)
-    }
-  }
-
-  private[server] def clearInFlightRequest(): Unit = {
-    if (!inflightRequest.compareAndSet(true, false)) {
-      warn("Attempting to clear AlterPartition in-flight flag when no apparent 
request is in-flight")
-    }
-  }
-
-  private def sendRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): Unit = {
-    val brokerEpoch = brokerEpochSupplier()
-    val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
-    debug(s"Sending AlterPartition to controller $request")
-
-    // We will not time out AlterPartition request, instead letting it retry 
indefinitely
-    // until a response is received, or a new LeaderAndIsr overwrites the 
existing isrState
-    // which causes the response for those partitions to be ignored.
-    controllerChannelManager.sendRequest(request,
-      new ControllerRequestCompletionHandler {
-        override def onComplete(response: ClientResponse): Unit = {
-          debug(s"Received AlterPartition response $response")
-          val error = try {
-            if (response.authenticationException != null) {
-              // For now, we treat authentication errors as retriable. We use 
the
-              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
-              // Note that `NodeToControllerChannelManager` will still log the
-              // authentication errors so that users have a chance to fix the 
problem.
-              Errors.NETWORK_EXCEPTION
-            } else if (response.versionMismatch != null) {
-              Errors.UNSUPPORTED_VERSION
-            } else {
-              handleAlterPartitionResponse(
-                response.responseBody.asInstanceOf[AlterPartitionResponse],
-                brokerEpoch,
-                inflightAlterPartitionItems
-              )
-            }
-          } finally {
-            // clear the flag so future requests can proceed
-            clearInFlightRequest()
-          }
-
-          // check if we need to send another request right away
-          error match {
-              case Errors.NONE =>
-                // In the normal case, check for pending updates to send 
immediately
-                maybePropagateIsrChanges()
-              case _ =>
-                // If we received a top-level error from the controller, retry 
the request in the near future
-                scheduler.scheduleOnce("send-alter-partition", () => 
maybePropagateIsrChanges(), 50)
-            }
-        }
-
-        override def onTimeout(): Unit = {
-          throw new IllegalStateException("Encountered unexpected timeout when 
sending AlterPartition to the controller")
-        }
-      })
-  }
-
-  /**
-   * Builds an AlterPartition request.
-   *
-   * While building the request, we don't know which version of the 
AlterPartition API is
-   * supported by the controller. The final decision is taken when the 
AlterPartitionRequest
-   * is built in the network client based on the advertised api versions of 
the controller.
-   *
-   * @return an AlterPartitionRequest.Builder with the provided parameters.
-   */
-  private def buildRequest(
-    inflightAlterPartitionItems: Seq[AlterPartitionItem],
-    brokerEpoch: Long
-  ): AlterPartitionRequest.Builder = {
-    val message = new AlterPartitionRequestData()
-      .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpoch)
-
-    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topicId).foreach { 
case (topicId, items) =>
-      val topicData = new 
AlterPartitionRequestData.TopicData().setTopicId(topicId)
-      message.topics.add(topicData)
-
-      items.foreach { item =>
-        val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicIdPartition.partitionId)
-          .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
-          .setNewIsrWithEpochs(item.leaderAndIsr.isrWithBrokerEpoch)
-          .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
-
-        
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
-
-        topicData.partitions.add(partitionData)
-      }
-    }
-
-    new AlterPartitionRequest.Builder(message)
-  }
-
-  private def handleAlterPartitionResponse(
-    alterPartitionResp: AlterPartitionResponse,
-    sentBrokerEpoch: Long,
-    inflightAlterPartitionItems: Seq[AlterPartitionItem],
-  ): Errors = {
-    val data = alterPartitionResp.data
-
-    Errors.forCode(data.errorCode) match {
-      case Errors.STALE_BROKER_EPOCH =>
-        warn(s"Broker had a stale broker epoch ($sentBrokerEpoch), retrying.")
-
-      case Errors.CLUSTER_AUTHORIZATION_FAILED =>
-        error(s"Broker is not authorized to send AlterPartition to controller",
-          Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is not 
authorized to send AlterPartition to controller"))
-
-      case Errors.NONE =>
-        // Collect partition-level responses to pass to the callbacks
-        val partitionResponses = new mutable.HashMap[TopicIdPartition, 
Either[Errors, LeaderAndIsr]]()
-        data.topics.forEach { topic =>
-          topic.partitions.forEach { partition =>
-            val tp = new TopicIdPartition(topic.topicId, 
partition.partitionIndex)
-            val apiError = Errors.forCode(partition.errorCode)
-            debug(s"Controller successfully handled AlterPartition request for 
$tp: $partition")
-            if (apiError == Errors.NONE) {
-              
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match {
-                case Some(leaderRecoveryState) =>
-                  partitionResponses(tp) = Right(
-                    new LeaderAndIsr(
-                      partition.leaderId,
-                      partition.leaderEpoch,
-                      partition.isr,
-                      leaderRecoveryState,
-                      partition.partitionEpoch
-                    )
-                  )
-
-                case None =>
-                  error(s"Controller returned an invalid leader recovery state 
(${partition.leaderRecoveryState}) for $tp: $partition")
-                  partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
-              }
-            } else {
-              partitionResponses(tp) = Left(apiError)
-            }
-          }
-        }
-
-        // Iterate across the items we sent rather than what we received to 
ensure we run the callback even if a
-        // partition was somehow erroneously excluded from the response. Note 
that these callbacks are run from
-        // the leaderIsrUpdateLock write lock in 
Partition#sendAlterPartitionRequest
-        inflightAlterPartitionItems.foreach { inflightAlterPartition =>
-          partitionResponses.get(inflightAlterPartition.topicIdPartition) 
match {
-            case Some(leaderAndIsrOrError) =>
-              // Regardless of callback outcome, we need to clear from the 
unsent updates map to unblock further
-              // updates. We clear it now to allow the callback to submit a 
new update if needed.
-              unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition)
-              leaderAndIsrOrError match {
-                case Left(error) => 
inflightAlterPartition.future.completeExceptionally(error.exception)
-                case Right(leaderAndIsr) => 
inflightAlterPartition.future.complete(leaderAndIsr)
-              }
-            case None =>
-              // Don't remove this partition from the update map so it will 
get re-sent
-              warn(s"Partition ${inflightAlterPartition.topicIdPartition} was 
sent but not included in the response")
-          }
-        }
-
-      case e =>
-        warn(s"Controller returned an unexpected top-level error when handling 
AlterPartition request: $e")
-    }
-
-    Errors.forCode(data.errorCode)
-  }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 102981827ce..fe0fd1d2ec5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.server.{AssignmentsManager, 
BrokerFeatures, BrokerLifecy
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
LogManager => JLogManager}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
+import org.apache.kafka.server.partition.{AlterPartitionManager, 
DefaultAlterPartitionManager}
 
 import java.time.Duration
 import java.util
@@ -290,14 +291,14 @@ class BrokerServer(
 
       remoteLogManagerOpt = createRemoteLogManager(listenerInfo)
 
-      alterPartitionManager = AlterPartitionManager(
+      alterPartitionManager = DefaultAlterPartitionManager.create(
         config,
-        scheduler = kafkaScheduler,
+        kafkaScheduler,
         controllerNodeProvider,
-        time = time,
+        time,
         metrics,
         s"broker-${config.nodeId}-",
-        brokerEpochSupplier = () => lifecycleManager.brokerEpoch
+        () => lifecycleManager.brokerEpoch
       )
       alterPartitionManager.start()
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 50d6f0ff4a3..8b0d3950c17 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.partition.PartitionListener
+import org.apache.kafka.server.partition.{AlterPartitionManager, 
PartitionListener}
 import 
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, 
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
 import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 3d8e4a19d60..71f33100094 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
+import org.apache.kafka.server.partition.AlterPartitionManager
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogDirFailureChannel}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 7ec1bc56e3a..2375edb43ba 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -17,7 +17,6 @@
 package kafka.cluster
 
 import kafka.utils.TestUtils
-import kafka.utils.TestUtils.MockAlterPartitionManager
 import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.utils.Utils
@@ -26,6 +25,7 @@ import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.partition.AlterPartitionListener
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.MockAlterPartitionManager
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogManager}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -78,7 +78,7 @@ class AbstractPartitionTest {
       new CleanerConfig(false), time, transactionVerificationEnabled = true)
     logManager.startup(util.Set.of)
 
-    alterPartitionManager = TestUtils.createAlterIsrManager()
+    alterPartitionManager = new MockAlterPartitionManager()
     alterPartitionListener = createIsrChangeListener()
     partition = new Partition(topicPartition,
       replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index d425b08932d..1c2f19c95c7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -22,7 +22,6 @@ import java.util
 import java.util.{Optional, Properties}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord}
@@ -33,7 +32,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, 
MetadataCache, MockConfigRepository, PartitionRegistration}
 import org.apache.kafka.server.common.{RequestLocal, TopicIdPartition}
 import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.partition.{AlterPartitionListener, 
CommittedPartitionState, PendingShrinkIsr}
+import org.apache.kafka.server.partition.{AlterPartitionListener, 
AlterPartitionManager, CommittedPartitionState, PendingShrinkIsr}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 0f4a94cb05f..40f385e6a22 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -54,7 +54,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager, RequestLocal}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.partition.{AlterPartitionListener, 
OngoingReassignmentState, PartitionListener, PendingShrinkIsr, 
SimpleAssignmentState}
+import org.apache.kafka.server.partition.{AlterPartitionListener, 
AlterPartitionManager, DefaultAlterPartitionManager, OngoingReassignmentState, 
PartitionListener, PendingShrinkIsr, SimpleAssignmentState}
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
UnexpectedAppendOffsetException}
@@ -1270,7 +1270,7 @@ class PartitionTest extends AbstractPartitionTest {
     // Expansion does not affect the ISR
     assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr, 
"ISR")
     assertEquals(util.Set.of(leader, follower1, follower2), 
partition.partitionState.maximalIsr, "ISR")
-    
assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet,
+    
assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr.asScala.toSet,
       Set(leader, follower1, follower2), "AlterIsr")
   }
 
@@ -1511,7 +1511,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(util.Set.of(brokerId, remoteBrokerId), 
partition.partitionState.maximalIsr)
     assertEquals(1, alterPartitionManager.isrUpdates.size)
-    assertEquals(Set(brokerId, remoteBrokerId), 
alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet)
+    assertEquals(Set(brokerId, remoteBrokerId), 
alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr.asScala.toSet)
 
     // Simulate invalid request failure
     alterPartitionManager.failIsrUpdate(Errors.INVALID_REQUEST)
@@ -1566,7 +1566,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
     assertEquals(alterPartitionManager.isrUpdates.size, 1)
-    val isrItem = alterPartitionManager.isrUpdates.head
+    val isrItem = alterPartitionManager.isrUpdates.peek()
     assertEquals(isrItem.leaderAndIsr.isr, util.Set.of[Integer](brokerId, 
remoteBrokerId))
     isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
       // the broker epochs should be equal to broker epoch of the leader
@@ -1830,7 +1830,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(1, alterPartitionManager.isrUpdates.size)
 
     // Expansion succeeds.
-    alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)
+    alterPartitionManager.completeIsrUpdate(1)
 
     // ISR is committed.
     assertEquals(replicas.toSet.asJava, partition.partitionState.isr)
@@ -1923,7 +1923,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(isr.toSet.asJava, partition.partitionState.isr)
     assertEquals(replicas.toSet.asJava, partition.partitionState.maximalIsr)
     assertEquals(1, alterPartitionManager.isrUpdates.size)
-    val isrUpdate = alterPartitionManager.isrUpdates.head
+    val isrUpdate = alterPartitionManager.isrUpdates.peek()
     isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
       if (brokerState.brokerId() == remoteBrokerId2) {
         // remoteBrokerId2 has not received any fetch request yet, it does not 
have broker epoch.
@@ -2085,7 +2085,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(1, alterPartitionManager.isrUpdates.size)
 
     // Expansion succeeds.
-    alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1)
+    alterPartitionManager.completeIsrUpdate(1)
 
     // ISR is committed.
     assertEquals(replicas.toSet.asJava, partition.partitionState.isr)
@@ -2121,7 +2121,7 @@ class PartitionTest extends AbstractPartitionTest {
     // Try to shrink the ISR
     partition.maybeShrinkIsr()
     assertEquals(alterPartitionManager.isrUpdates.size, 1)
-    assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, 
util.Set.of[Integer](brokerId))
+    assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr, 
util.Set.of[Integer](brokerId))
     assertEquals(util.Set.of(brokerId, remoteBrokerId), 
partition.partitionState.isr)
     assertEquals(util.Set.of(brokerId, remoteBrokerId), 
partition.partitionState.maximalIsr)
 
@@ -2136,7 +2136,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     // The shrink succeeds after retrying
-    alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+    alterPartitionManager.completeIsrUpdate(2)
     assertEquals(1, alterPartitionListener.shrinks.get)
     assertEquals(2, partition.getPartitionEpoch)
     assertEquals(alterPartitionManager.isrUpdates.size, 0)
@@ -2206,8 +2206,8 @@ class PartitionTest extends AbstractPartitionTest {
     partition.maybeShrinkIsr()
     assertEquals(0, alterPartitionListener.shrinks.get)
     assertEquals(alterPartitionManager.isrUpdates.size, 1)
-    assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, 
util.Set.of[Integer](brokerId, remoteBrokerId1))
-    val isrUpdate = alterPartitionManager.isrUpdates.head
+    assertEquals(alterPartitionManager.isrUpdates.peek().leaderAndIsr.isr, 
util.Set.of[Integer](brokerId, remoteBrokerId1))
+    val isrUpdate = alterPartitionManager.isrUpdates.peek()
     isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
       assertEquals(defaultBrokerEpoch(brokerState.brokerId()), 
brokerState.brokerEpoch())
     }
@@ -2217,7 +2217,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // After the ISR shrink completes, the ISR state should be updated and the
     // high watermark should be advanced
-    alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
+    alterPartitionManager.completeIsrUpdate(2)
     assertEquals(1, alterPartitionListener.shrinks.get)
     assertEquals(2, partition.getPartitionEpoch)
     assertEquals(alterPartitionManager.isrUpdates.size, 0)
@@ -2601,11 +2601,10 @@ class PartitionTest extends AbstractPartitionTest {
   def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
     val mockChannelManager = mock(classOf[NodeToControllerChannelManager])
     val alterPartitionManager = new DefaultAlterPartitionManager(
-      controllerChannelManager = mockChannelManager,
-      scheduler = mock(classOf[KafkaScheduler]),
-      time = time,
-      brokerId = brokerId,
-      brokerEpochSupplier = () => 0
+      mockChannelManager,
+      mock(classOf[KafkaScheduler]),
+      brokerId,
+      () => 0
     )
 
     partition = new Partition(topicPartition,
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
deleted file mode 100644
index 4793723bc6a..00000000000
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ /dev/null
@@ -1,533 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Collections
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.{AuthenticationException, 
OperationNotAttemptedException, UnknownServerException, 
UnsupportedVersionException}
-import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
-import org.apache.kafka.common.message.{AlterPartitionRequestData, 
AlterPartitionResponseData}
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.MessageUtil
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.RequestHeader
-import org.apache.kafka.common.requests.{AbstractRequest, 
AlterPartitionRequest, AlterPartitionResponse}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.util.{MockScheduler, MockTime}
-import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
-import org.mockito.ArgumentMatcher
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, reset, times, verify}
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-
-class AlterPartitionManagerTest {
-  val topicId = Uuid.randomUuid()
-  val time = new MockTime
-  val metrics = new Metrics
-  val brokerId = 1
-
-  var brokerToController: NodeToControllerChannelManager = _
-
-  val tp0 = new TopicIdPartition(topicId, 0)
-  val tp1 = new TopicIdPartition(topicId, 1)
-  val tp2 = new TopicIdPartition(topicId, 2)
-
-  @BeforeEach
-  def setup(): Unit = {
-    brokerToController = mock(classOf[NodeToControllerChannelManager])
-  }
-
-  @Test
-  def testBasic(): Unit = {
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(any(), any())
-  }
-
-  @Test
-  def testBasicWithBrokerEpoch(): Unit = {
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 101)
-    alterPartitionManager.start()
-    val isrWithBrokerEpoch = ListBuffer[BrokerState]()
-    for (ii <- 1 to 3) {
-      isrWithBrokerEpoch += new 
BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii)
-    }
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10))
-
-    val expectedAlterPartitionData = new AlterPartitionRequestData()
-      .setBrokerId(brokerId)
-      .setBrokerEpoch(101)
-    val topicData = new AlterPartitionRequestData.TopicData()
-      .setTopicId(topicId)
-
-    val newIsrWithBrokerEpoch = new ListBuffer[BrokerState]()
-    newIsrWithBrokerEpoch.append(new 
BrokerState().setBrokerId(1).setBrokerEpoch(101))
-    newIsrWithBrokerEpoch.append(new 
BrokerState().setBrokerId(2).setBrokerEpoch(102))
-    newIsrWithBrokerEpoch.append(new 
BrokerState().setBrokerId(3).setBrokerEpoch(103))
-    topicData.partitions.add(new AlterPartitionRequestData.PartitionData()
-      .setPartitionIndex(0)
-      .setLeaderEpoch(1)
-      .setPartitionEpoch(10)
-      .setNewIsrWithEpochs(newIsrWithBrokerEpoch.toList.asJava))
-
-    expectedAlterPartitionData.topics.add(topicData)
-
-    verify(brokerToController).start()
-    val captor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: 
AbstractRequest]])
-    verify(brokerToController).sendRequest(captor.capture(), any())
-    assertEquals(expectedAlterPartitionData, 
captor.getValue.asInstanceOf[AlterPartitionRequest.Builder].build().data())
-  }
-
-  @ParameterizedTest
-  @EnumSource(classOf[LeaderRecoveryState])
-  def testBasicSentLeaderRecoveryState(leaderRecoveryState: 
LeaderRecoveryState): Unit = {
-    val requestCapture = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1).map(Int.box).asJava, leaderRecoveryState, 10))
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(requestCapture.capture(), any())
-
-    val request = requestCapture.getValue.build()
-    assertEquals(leaderRecoveryState.value, 
request.data.topics.get(0).partitions.get(0).leaderRecoveryState())
-  }
-
-  @Test
-  def testOverwriteWithinBatch(): Unit = {
-    val capture: 
ArgumentCaptor[AbstractRequest.Builder[AlterPartitionRequest]] = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
-    val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-
-    // Only send one ISR update for a given topic+partition
-    val firstSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
-    assertFalse(firstSubmitFuture.isDone)
-
-    val failedSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
-    assertTrue(failedSubmitFuture.isCompletedExceptionally)
-    assertFutureThrows(classOf[OperationNotAttemptedException], 
failedSubmitFuture)
-
-    // Simulate response
-    val alterPartitionResp = partitionResponse()
-    val resp = makeClientResponse(
-      response = alterPartitionResp,
-      version = ApiKeys.ALTER_PARTITION.latestVersion
-    )
-    verify(brokerToController).sendRequest(capture.capture(), 
callbackCapture.capture())
-    callbackCapture.getValue.onComplete(resp)
-
-    // Now we can submit this partition again
-    val newSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 
10))
-    assertFalse(newSubmitFuture.isDone)
-
-    verify(brokerToController).start()
-    verify(brokerToController, times(2)).sendRequest(capture.capture(), 
callbackCapture.capture())
-
-    // Make sure we sent the right request ISR={1}
-    val request = capture.getValue.build()
-    assertEquals(request.data().topics().size(), 1)
-    if (request.version() < 3) {
-      assertEquals(request.data.topics.get(0).partitions.get(0).newIsr.size, 1)
-    } else {
-      
assertEquals(request.data.topics.get(0).partitions.get(0).newIsrWithEpochs.size,
 1)
-    }
-  }
-
-  @Test
-  def testSingleBatch(): Unit = {
-    val capture: 
ArgumentCaptor[AbstractRequest.Builder[AlterPartitionRequest]] = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
-    val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-
-    // First request will send batch of one
-    alterPartitionManager.submit(new TopicIdPartition(topicId, 0),
-      new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
-
-    // Other submissions will queue up until a response
-    for (i <- 1 to 9) {
-      alterPartitionManager.submit(new TopicIdPartition(topicId, i),
-        new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
-    }
-
-    // Simulate response, omitting partition 0 will allow it to stay in unsent 
queue
-    val alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData())
-    val resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterPartitionResp)
-
-    // On the callback, we check for unsent items and send another request
-    verify(brokerToController).sendRequest(capture.capture(), 
callbackCapture.capture())
-    callbackCapture.getValue.onComplete(resp)
-
-    verify(brokerToController).start()
-    verify(brokerToController, times(2)).sendRequest(capture.capture(), 
callbackCapture.capture())
-
-    // Verify the last request sent had all 10 items
-    val request = capture.getValue.build()
-    assertEquals(request.data().topics().size(), 1)
-    assertEquals(request.data().topics().get(0).partitions().size(), 10)
-  }
-
-  @Test
-  def testSubmitFromCallback(): Unit = {
-    // prepare a partition level retriable error response
-    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
-    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
-
-    val leaderId = 1
-    val leaderEpoch = 1
-    val partitionEpoch = 10
-    val isr = List(1, 2, 3)
-    val leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch, 
isr.map(Int.box).asJava, LeaderRecoveryState.RECOVERED, partitionEpoch)
-    val callbackCapture = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-    val future = alterPartitionManager.submit(tp0, leaderAndIsr)
-    val finalFuture = new CompletableFuture[LeaderAndIsr]()
-    future.whenComplete { (_, e) =>
-      if (e != null) {
-        // Retry when error.
-        alterPartitionManager.submit(tp0, leaderAndIsr).whenComplete { 
(result, e) =>
-          if (e != null) {
-            finalFuture.completeExceptionally(e)
-          } else {
-            finalFuture.complete(result)
-          }
-        }
-      } else {
-        finalFuture.completeExceptionally(new AssertionError("Expected the 
future to be failed"))
-      }
-    }
-
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-    reset(brokerToController)
-    callbackCapture.getValue.onComplete(errorResponse)
-
-    // Complete the retry request
-    val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, 
partitionEpoch, leaderId, leaderEpoch, isr)
-    val retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion)
-
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-    callbackCapture.getValue.onComplete(retryResponse)
-
-    assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
-    // No more items in unsentIsrUpdates
-    assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
-  }
-
-  @Test
-  def testAuthorizationFailed(): Unit = {
-    testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
-  }
-
-  @Test
-  def testStaleBrokerEpoch(): Unit = {
-    testRetryOnTopLevelError(Errors.STALE_BROKER_EPOCH)
-  }
-
-  @Test
-  def testUnknownServer(): Unit = {
-    testRetryOnTopLevelError(Errors.UNKNOWN_SERVER_ERROR)
-  }
-
-  @Test
-  def testRetryOnAuthenticationFailure(): Unit = {
-    testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
-      false, null, new AuthenticationException("authentication failed"), null))
-  }
-
-  @Test
-  def testRetryOnUnsupportedVersionError(): Unit = {
-    testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
-      false, new UnsupportedVersionException("unsupported version"), null, 
null))
-  }
-
-  private def testRetryOnTopLevelError(error: Errors): Unit = {
-    val alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData().setErrorCode(error.code))
-    val response = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion)
-    testRetryOnErrorResponse(response)
-  }
-
-  private def testRetryOnErrorResponse(response: ClientResponse): Unit = {
-    val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
-    val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, leaderAndIsr)
-
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-    callbackCapture.getValue.onComplete(response)
-
-    // Any top-level error, we want to retry, so we don't clear items from the 
pending map
-    assertTrue(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
-
-    reset(brokerToController)
-
-    // After some time, we will retry failed requests
-    time.sleep(100)
-    scheduler.tick()
-
-    // After a successful response, we can submit another AlterIsrItem
-    val retryAlterPartitionResponse = partitionResponse()
-    val retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion)
-
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-    callbackCapture.getValue.onComplete(retryResponse)
-
-    assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
-  }
-
-  @Test
-  def testInvalidUpdateVersion(): Unit = {
-    checkPartitionError(Errors.INVALID_UPDATE_VERSION)
-  }
-
-  @Test
-  def testUnknownTopicPartition(): Unit = {
-    checkPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)
-  }
-
-  @Test
-  def testNotLeaderOrFollower(): Unit = {
-    checkPartitionError(Errors.NOT_LEADER_OR_FOLLOWER)
-  }
-
-  @Test
-  def testInvalidRequest(): Unit = {
-    checkPartitionError(Errors.INVALID_REQUEST)
-  }
-
-  private def checkPartitionError(error: Errors): Unit = {
-    val alterPartitionManager = testPartitionError(tp0, error)
-    // Any partition-level error should clear the item from the pending queue 
allowing for future updates
-    val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-    assertFalse(future.isDone)
-  }
-
-  private def testPartitionError(tp: TopicIdPartition, error: Errors): 
AlterPartitionManager = {
-    val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    reset(brokerToController)
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-
-    val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-    reset(brokerToController)
-
-    val alterPartitionResp = partitionResponse(tp, error)
-    val resp = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion)
-    callbackCapture.getValue.onComplete(resp)
-    assertTrue(future.isCompletedExceptionally)
-    assertFutureThrows(error.exception.getClass, future)
-    alterPartitionManager
-  }
-
-  @Test
-  def testOneInFlight(): Unit = {
-    val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    val scheduler = new MockScheduler(time)
-    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2)
-    alterPartitionManager.start()
-
-    // First submit will send the request
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
-    // These will become pending unsent items
-    alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-    alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
-
-    verify(brokerToController).start()
-    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
-
-    // Once the callback runs, another request will be sent
-    reset(brokerToController)
-
-    val alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData())
-    val resp = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion)
-    callbackCapture.getValue.onComplete(resp)
-  }
-
-  @Test
-  def testPartitionMissingInResponse(): Unit = {
-    val expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion
-    val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
-    val brokerEpoch = 2
-    val scheduler = new MockScheduler(time)
-    val brokerToController = 
Mockito.mock(classOf[NodeToControllerChannelManager])
-    val alterPartitionManager = new DefaultAlterPartitionManager(
-      brokerToController,
-      scheduler,
-      time,
-      brokerId,
-      () => brokerEpoch
-    )
-    alterPartitionManager.start()
-
-    // The first `submit` will send the `AlterIsr` request
-    val future1 = alterPartitionManager.submit(tp0, leaderAndIsr)
-    val callback1 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
-      expectedTopicPartitions = Set(tp0),
-      expectedVersion = expectedVersion
-    ))
-
-    // Additional calls while the `AlterIsr` request is inflight will be queued
-    val future2 = alterPartitionManager.submit(tp1, leaderAndIsr)
-    val future3 = alterPartitionManager.submit(tp2, leaderAndIsr)
-
-    // Respond to the first request, which will also allow the next request to 
get sent
-    callback1.onComplete(makeClientResponse(
-      response = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR),
-      version = expectedVersion
-    ))
-    assertFutureThrows(classOf[UnknownServerException], future1)
-    assertFalse(future2.isDone)
-    assertFalse(future3.isDone)
-
-    // Verify the second request includes both expected partitions, but only 
respond with one of them
-    val callback2 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
-      expectedTopicPartitions = Set(tp1, tp2),
-      expectedVersion = expectedVersion
-    ))
-    callback2.onComplete(makeClientResponse(
-      response = partitionResponse(tp2, Errors.UNKNOWN_SERVER_ERROR),
-      version = expectedVersion
-    ))
-    assertFutureThrows(classOf[UnknownServerException], future3)
-    assertFalse(future2.isDone)
-
-    // The missing partition should be retried
-    val callback3 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
-      expectedTopicPartitions = Set(tp1),
-      expectedVersion = expectedVersion
-    ))
-    callback3.onComplete(makeClientResponse(
-      response = partitionResponse(tp1, Errors.UNKNOWN_SERVER_ERROR),
-      version = expectedVersion
-    ))
-    assertFutureThrows(classOf[UnknownServerException], future2)
-  }
-
-  private def verifySendRequest(
-    brokerToController: NodeToControllerChannelManager,
-    expectedRequest: ArgumentMatcher[AbstractRequest.Builder[_ <: 
AbstractRequest]]
-  ): ControllerRequestCompletionHandler = {
-    val callbackCapture = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-
-    Mockito.verify(brokerToController).sendRequest(
-      ArgumentMatchers.argThat(expectedRequest),
-      callbackCapture.capture()
-    )
-
-    Mockito.reset(brokerToController)
-
-    callbackCapture.getValue
-  }
-
-  private def alterPartitionRequestMatcher(
-    expectedTopicPartitions: Set[TopicIdPartition],
-    expectedVersion: Short
-  ): ArgumentMatcher[AbstractRequest.Builder[_ <: AbstractRequest]] = {
-    request => {
-      assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey)
-
-      val alterPartitionRequest = 
request.asInstanceOf[AlterPartitionRequest.Builder].build()
-      assertEquals(expectedVersion, alterPartitionRequest.version)
-
-      val requestTopicIdPartitions = 
alterPartitionRequest.data.topics.asScala.flatMap { topicData =>
-        topicData.partitions.asScala.map { partitionData =>
-          (topicData.topicId, partitionData.partitionIndex)
-        }
-      }.toSet
-
-      expectedTopicPartitions.map(tp => (tp.topicId, tp.partitionId)) == 
requestTopicIdPartitions
-    }
-  }
-
-  private def makeClientResponse(
-    response: AlterPartitionResponse,
-    version: Short
-  ): ClientResponse = {
-    new ClientResponse(
-      new RequestHeader(response.apiKey, version, "", 0),
-      null,
-      "",
-      0L,
-      0L,
-      false,
-      null,
-      null,
-      // Response is serialized and deserialized to ensure that its does
-      // not contain ignorable fields used by other versions.
-      
AlterPartitionResponse.parse(MessageUtil.toByteBufferAccessor(response.data, 
version), version)
-    )
-  }
-
-  private def partitionResponse(
-    tp: TopicIdPartition = tp0,
-    error: Errors = Errors.NONE,
-    partitionEpoch: Int = 0,
-    leaderId: Int = 0,
-    leaderEpoch: Int = 0,
-    isr: List[Int] = List.empty
-  ): AlterPartitionResponse = {
-    new AlterPartitionResponse(new AlterPartitionResponseData()
-      .setTopics(Collections.singletonList(
-        new AlterPartitionResponseData.TopicData()
-          .setTopicId(tp.topicId)
-          .setPartitions(Collections.singletonList(
-            new AlterPartitionResponseData.PartitionData()
-              .setPartitionIndex(tp.partitionId)
-              .setPartitionEpoch(partitionEpoch)
-              .setLeaderEpoch(leaderEpoch)
-              .setLeaderId(leaderId)
-              .setIsr(isr.map(Integer.valueOf).asJava)
-              .setErrorCode(error.code))))))
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index ff729c79699..eba5042f0a7 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.internal.SimpleRecord
 import org.apache.kafka.metadata.{KRaftMetadataCache, MockConfigRepository}
 import org.apache.kafka.server.common.KRaftVersion
-import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
+import org.apache.kafka.server.util.{KafkaScheduler, MockTime, 
MockAlterPartitionManager}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogDirFailureChannel}
 
 import java.util.Optional
@@ -50,7 +50,7 @@ class HighwatermarkPersistenceTest {
     new LogDirFailureChannel(config.logDirs.size)
   }
 
-  val alterIsrManager = TestUtils.createAlterIsrManager()
+  val alterIsrManager = new MockAlterPartitionManager()
 
   @AfterEach
   def teardown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 43cdef2ffb1..2c1a5fdb5de 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -20,7 +20,6 @@ import java.util
 import java.util.Properties
 import kafka.cluster.Partition
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.TestUtils.MockAlterPartitionManager
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +28,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.{MockAlterPartitionManager, MockTime}
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
LogManager, LogOffsetMetadata, UnifiedLog}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -64,7 +63,7 @@ class IsrExpirationTest {
     val logManager: LogManager = mock(classOf[LogManager])
     when(logManager.liveLogDirs).thenReturn(util.List.of)
 
-    alterIsrManager = TestUtils.createAlterIsrManager()
+    alterIsrManager = new MockAlterPartitionManager()
     quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "", 
"")
     replicaManager = new ReplicaManager(
       metrics = metrics,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 3386a8eee86..0ed1a999d71 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.server.HostedPartition
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
TopicIdPartition}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.partition.AlterPartitionManager
 import org.apache.kafka.server.quota.ReplicationQuotaManager
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
@@ -489,6 +490,10 @@ class ReplicaManagerConcurrencyTest extends Logging {
     ): CompletableFuture[LeaderAndIsr] = {
       channel.alterIsr(topicPartition, leaderAndIsr)
     }
+
+    override def start(): Unit = {}
+
+    override def shutdown(): Unit = {}
   }
 
   private def registration(
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 3a5c3a6309f..fc4cfec1466 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
 import org.apache.kafka.server.common.KRaftVersion
+import org.apache.kafka.server.partition.AlterPartitionManager
 import org.apache.kafka.server.quota.ReplicaQuota
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 857084099ef..b38e433b292 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -68,6 +68,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch, 
DelayedRemoteListOffsets}
 import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
 import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
+import org.apache.kafka.server.partition.AlterPartitionManager
 import org.apache.kafka.server.share.SharePartitionKey
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 660e1450013..a06aff1eed7 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.internal.RecordBatch
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
-import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.{MockAlterPartitionManager, MockTime}
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
LogManager, UnifiedLog}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -42,7 +42,7 @@ class OffsetsForLeaderEpochTest {
   private val config = 
TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps).head
   private val time = new MockTime
   private val metrics = new Metrics
-  private val alterIsrManager = TestUtils.createAlterIsrManager()
+  private val alterIsrManager = new MockAlterPartitionManager()
   private val tp = new TopicPartition("topic", 1)
   private var replicaManager: ReplicaManager = _
   private var quotaManager: QuotaManagers = _
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b2f12f30331..c0bc7618a57 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -29,13 +29,13 @@ import org.apache.kafka.common._
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBindingFilter}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{OperationNotAttemptedException, 
TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{TopicExistsException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.{Plugin, Topic}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, 
ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests._
@@ -51,7 +51,6 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
-import org.apache.kafka.server.common.TopicIdPartition
 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
@@ -71,7 +70,6 @@ import java.nio.file.{Files, StandardOpenOption}
 import java.time.Duration
 import java.util
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
 import java.util.{Optional, Properties}
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, mutable}
@@ -1004,52 +1002,6 @@ object TestUtils extends Logging {
       logManager
   }
 
-  class MockAlterPartitionManager extends AlterPartitionManager {
-    val isrUpdates: mutable.Queue[AlterPartitionItem] = new 
mutable.Queue[AlterPartitionItem]()
-    val inFlight: AtomicBoolean = new AtomicBoolean(false)
-
-
-    override def submit(
-      topicPartition: TopicIdPartition,
-      leaderAndIsr: LeaderAndIsr,
-    ): CompletableFuture[LeaderAndIsr]= {
-      val future = new CompletableFuture[LeaderAndIsr]()
-      if (inFlight.compareAndSet(false, true)) {
-        isrUpdates += AlterPartitionItem(
-          topicPartition,
-          leaderAndIsr,
-          future
-        )
-      } else {
-        future.completeExceptionally(new OperationNotAttemptedException(
-          s"Failed to enqueue AlterIsr request for $topicPartition since there 
is already an inflight request"))
-      }
-      future
-    }
-
-    def completeIsrUpdate(newPartitionEpoch: Int): Unit = {
-      if (inFlight.compareAndSet(true, false)) {
-        val item = isrUpdates.dequeue()
-        
item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))
-      } else {
-        fail("Expected an in-flight ISR update, but there was none")
-      }
-    }
-
-    def failIsrUpdate(error: Errors): Unit = {
-      if (inFlight.compareAndSet(true, false)) {
-        val item = isrUpdates.dequeue()
-        item.future.completeExceptionally(error.exception)
-      } else {
-        fail("Expected an in-flight ISR update, but there was none")
-      }
-    }
-  }
-
-  def createAlterIsrManager(): MockAlterPartitionManager = {
-    new MockAlterPartitionManager()
-  }
-
   def generateAndProduceMessages[B <: KafkaBroker](
       brokers: Seq[B],
       topic: String,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index bb4b8bc92e5..28f3577f4a5 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.jmh.fetcher;
 
 import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerBlockingSender;
 import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
@@ -57,6 +56,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.network.BrokerEndPoint;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.quota.ReplicaQuota;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 0ee9cf4fb35..68dddd2c8d2 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.partition;
 
 import kafka.cluster.DelayedOperations;
 import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
 import kafka.server.builders.LogManagerBuilder;
 
 import org.apache.kafka.common.DirectoryId;
@@ -35,6 +34,7 @@ import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.partition.AlterPartitionListener;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index c4b41a0296f..4ece85a1f45 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.partition;
 
 import kafka.cluster.DelayedOperations;
 import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
 import kafka.server.builders.LogManagerBuilder;
 
 import org.apache.kafka.common.DirectoryId;
@@ -31,6 +30,7 @@ import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.partition.AlterPartitionListener;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.replica.Replica;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index ed2b34af5a1..480ce855481 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.jmh.server;
 
 import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
 import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
@@ -33,6 +32,7 @@ import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
 import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 1692e2a64d7..5d1ac018bd7 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.jmh.server;
 
 import kafka.cluster.Partition;
-import kafka.server.AlterPartitionManager;
 import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
@@ -36,6 +35,7 @@ import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.partition.AlterPartitionManager;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git 
a/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
 
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
new file mode 100644
index 00000000000..b92b128d709
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionItem.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+public record AlterPartitionItem(TopicIdPartition topicIdPartition, 
LeaderAndIsr leaderAndIsr,
+                                 CompletableFuture<LeaderAndIsr> future) {
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
 
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
new file mode 100644
index 00000000000..076f8eccf85
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/partition/AlterPartitionManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handles updating the ISR by sending AlterPartition requests to the 
controller. Updating the ISR is an asynchronous
+ * operation, so partitions will learn about the result of their request 
through a callback.
+ * <p>
+ * Note that ISR state changes can still be initiated by the controller and 
sent to the partitions via LeaderAndIsr
+ * requests.
+ */
+public interface AlterPartitionManager {
+    void start();
+
+    void shutdown() throws InterruptedException;
+
+    CompletableFuture<LeaderAndIsr> submit(TopicIdPartition topicIdPartition, 
LeaderAndIsr leaderAndIsr);
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
 
b/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
new file mode 100644
index 00000000000..dc43d1daee7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.ControllerInformation;
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.util.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultAlterPartitionManager implements AlterPartitionManager {
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultAlterPartitionManager.class);
+    private final NodeToControllerChannelManager controllerChannelManager;
+    private final Scheduler scheduler;
+    private final int brokerId;
+    private final Supplier<Long> brokerEpochSupplier;
+
+    // Used to allow only one pending ISR update per partition (visible for 
testing)
+    final ConcurrentHashMap<TopicIdPartition, AlterPartitionItem> 
unsentIsrUpdates = new ConcurrentHashMap<>();
+
+    // Used to allow only one in-flight request at a time
+    private final AtomicBoolean inflightRequest = new AtomicBoolean(false);
+
+    public DefaultAlterPartitionManager(NodeToControllerChannelManager 
controllerChannelManager, Scheduler scheduler, int brokerId, Supplier<Long> 
brokerEpochSupplier) {
+        this.controllerChannelManager = controllerChannelManager;
+        this.scheduler = scheduler;
+        this.brokerId = brokerId;
+        this.brokerEpochSupplier = brokerEpochSupplier;
+    }
+
+    public static DefaultAlterPartitionManager create(AbstractKafkaConfig 
config,
+                                                      Scheduler scheduler,
+                                                      
Supplier<ControllerInformation> controllerNodeProvider,
+                                                      Time time,
+                                                      Metrics metrics,
+                                                      String threadNamePrefix,
+                                                      Supplier<Long> 
brokerEpochSupplier) {
+        NodeToControllerChannelManager channelManager = new 
NodeToControllerChannelManagerImpl(
+                controllerNodeProvider,
+                time,
+                metrics,
+                config,
+                "alter-partition",
+                threadNamePrefix,
+                Long.MAX_VALUE
+        );
+
+        return new DefaultAlterPartitionManager(channelManager, scheduler, 
config.brokerId(), brokerEpochSupplier);
+    }
+
+    @Override
+    public void start() {
+        controllerChannelManager.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        controllerChannelManager.shutdown();
+    }
+
+    @Override
+    public CompletableFuture<LeaderAndIsr> submit(TopicIdPartition 
topicIdPartition,
+                                                  LeaderAndIsr leaderAndIsr) {
+        CompletableFuture<LeaderAndIsr> future = new CompletableFuture<>();
+        AlterPartitionItem alterPartitionItem = new 
AlterPartitionItem(topicIdPartition, leaderAndIsr, future);
+        boolean enqueued = 
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition(), 
alterPartitionItem) == null;
+        if (enqueued) {
+            maybePropagateIsrChanges();
+        } else {
+            future.completeExceptionally(new 
OperationNotAttemptedException(String.format(
+                    "Failed to enqueue ISR change state %s for partition %s", 
leaderAndIsr, topicIdPartition)));
+        }
+        return future;
+    }
+
+    void maybePropagateIsrChanges() {
+        // Send all pending items if there is not already a request in-flight.
+        if (!unsentIsrUpdates.isEmpty() && 
inflightRequest.compareAndSet(false, true)) {
+            // Copy current unsent ISRs but don't remove from the map, they 
get cleared in the response handler
+            List<AlterPartitionItem> inflightAlterPartitionItems = new 
ArrayList<>(unsentIsrUpdates.values());
+            sendRequest(inflightAlterPartitionItems);
+        }
+    }
+
+    void clearInFlightRequest() {
+        if (!inflightRequest.compareAndSet(true, false)) {
+            log.warn("Attempting to clear AlterPartition in-flight flag when 
no apparent request is in-flight");
+        }
+    }
+
+    private void sendRequest(List<AlterPartitionItem> 
inflightAlterPartitionItems) {
+        long brokerEpoch = brokerEpochSupplier.get();
+        AlterPartitionRequest.Builder request = 
buildRequest(inflightAlterPartitionItems, brokerEpoch);
+        log.debug("Sending AlterPartition to controller {}", request);
+
+        // We will not time out AlterPartition request, instead letting it 
retry indefinitely
+        // until a response is received, or a new LeaderAndIsr overwrites the 
existing isrState
+        // which causes the response for those partitions to be ignored.
+        controllerChannelManager.sendRequest(request,
+                new ControllerRequestCompletionHandler() {
+                    @Override
+                    public void onComplete(ClientResponse response) {
+                        log.debug("Received AlterPartition response {}", 
response);
+                        Errors error;
+                        try {
+                            if (response.authenticationException() != null) {
+                                // For now, we treat authentication errors as 
retriable. We use the
+                                // `NETWORK_EXCEPTION` error code for lack of 
a good alternative.
+                                // Note that `NodeToControllerChannelManager` 
will still log the
+                                // authentication errors so that users have a 
chance to fix the problem.
+                                error = Errors.NETWORK_EXCEPTION;
+                            } else if (response.versionMismatch() != null) {
+                                error = Errors.UNSUPPORTED_VERSION;
+                            } else {
+                                error = handleAlterPartitionResponse(
+                                        (AlterPartitionResponse) 
response.responseBody(),
+                                        brokerEpoch,
+                                        inflightAlterPartitionItems
+                                );
+                            }
+                        } finally {
+                            // clear the flag so future requests can proceed
+                            clearInFlightRequest();
+                        }
+
+                        // check if we need to send another request right away
+                        if (error == Errors.NONE) {
+                            // In the normal case, check for pending updates 
to send immediately
+                            maybePropagateIsrChanges();
+                        } else {
+                            // If we received a top-level error from the 
controller, retry the request in the near future
+                            scheduler.scheduleOnce("send-alter-partition", () 
-> maybePropagateIsrChanges(), 50);
+                        }
+                    }
+
+                    @Override
+                    public void onTimeout() {
+                        throw new IllegalStateException("Encountered 
unexpected timeout when sending AlterPartition to the controller");
+                    }
+                });
+    }
+
+    /**
+     * Builds an AlterPartition request.
+     * <p>
+     * While building the request, we don't know which version of the 
AlterPartition API is
+     * supported by the controller. The final decision is taken when the 
AlterPartitionRequest
+     * is built in the network client based on the advertised api versions of 
the controller.
+     *
+     * @return an AlterPartitionRequest.Builder with the provided parameters.
+     */
+    private AlterPartitionRequest.Builder buildRequest(
+            List<AlterPartitionItem> inflightAlterPartitionItems,
+            long brokerEpoch) {
+        AlterPartitionRequestData message = new AlterPartitionRequestData()
+                .setBrokerId(brokerId)
+                .setBrokerEpoch(brokerEpoch);
+
+        inflightAlterPartitionItems.stream()
+                .collect(Collectors.groupingBy(item -> 
item.topicIdPartition().topicId()))
+                .forEach((topicId, items) -> {
+                    AlterPartitionRequestData.TopicData topicData = new 
AlterPartitionRequestData.TopicData().setTopicId(topicId);
+                    message.topics().add(topicData);
+
+                    items.forEach(item -> {
+                        AlterPartitionRequestData.PartitionData partitionData =
+                                new AlterPartitionRequestData.PartitionData()
+                                        
.setPartitionIndex(item.topicIdPartition().partitionId())
+                                        
.setLeaderEpoch(item.leaderAndIsr().leaderEpoch())
+                                        
.setNewIsrWithEpochs(item.leaderAndIsr().isrWithBrokerEpoch())
+                                        
.setPartitionEpoch(item.leaderAndIsr().partitionEpoch());
+
+                        
partitionData.setLeaderRecoveryState(item.leaderAndIsr().leaderRecoveryState().value());
+
+                        topicData.partitions().add(partitionData);
+                    });
+
+                });
+
+        return new AlterPartitionRequest.Builder(message);
+    }
+
+    private Errors handleAlterPartitionResponse(AlterPartitionResponse 
alterPartitionResponse,
+                                                long sentBrokerEpoch,
+                                                List<AlterPartitionItem> 
inflightAlterPartitionItems) {
+        AlterPartitionResponseData data = alterPartitionResponse.data();
+        Errors error = Errors.forCode(data.errorCode());
+        switch (error) {
+            case STALE_BROKER_EPOCH -> log.warn("Broker had a stale broker 
epoch ({}), retrying.", sentBrokerEpoch);
+            case CLUSTER_AUTHORIZATION_FAILED -> log.error("Broker is not 
authorized to send AlterPartition to controller",
+                    Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is 
not authorized to send AlterPartition to controller"));
+            case NONE -> {
+                // Collect partition-level responses to pass to the callbacks
+                Map<TopicIdPartition, LeaderAndIsr> successResponses = new 
HashMap<>();
+                Map<TopicIdPartition, Errors> errorResponses = new HashMap<>();
+                data.topics().forEach(topic ->
+                        topic.partitions().forEach(partition -> {
+                            TopicIdPartition tp = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+                            Errors apiError = 
Errors.forCode(partition.errorCode());
+                            log.debug("Controller successfully handled 
AlterPartition request for {}: {}", tp, partition);
+                            if (apiError == Errors.NONE) {
+                                Optional<LeaderRecoveryState> 
leaderRecoveryStateOpt =
+                                        
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState());
+                                if (leaderRecoveryStateOpt.isPresent()) {
+                                    successResponses.put(tp, new LeaderAndIsr(
+                                            partition.leaderId(),
+                                            partition.leaderEpoch(),
+                                            partition.isr(),
+                                            leaderRecoveryStateOpt.get(),
+                                            partition.partitionEpoch()
+                                    ));
+                                } else {
+                                    log.error("Controller returned an invalid 
leader recovery state ({}) for {}: {}", partition.leaderRecoveryState(), tp, 
partition);
+                                    errorResponses.put(tp, 
Errors.UNKNOWN_SERVER_ERROR);
+                                }
+                            } else {
+                                errorResponses.put(tp, apiError);
+                            }
+                        }
+                        ));
+                // Iterate across the items we sent rather than what we 
received to ensure we run the callback even if a
+                // partition was somehow erroneously excluded from the 
response. Note that these callbacks are run from
+                // the leaderIsrUpdateLock write lock in 
Partition#sendAlterPartitionRequest
+                inflightAlterPartitionItems.forEach(inflightAlterPartition -> {
+                    if 
(successResponses.containsKey(inflightAlterPartition.topicIdPartition())) {
+                        // Regardless of callback outcome, we need to clear 
from the unsent updates map to unblock further
+                        // updates. We clear it now to allow the callback to 
submit a new update if needed.
+                        
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition());
+                        
inflightAlterPartition.future().complete(successResponses.get(inflightAlterPartition.topicIdPartition()));
+                    } else if 
(errorResponses.containsKey(inflightAlterPartition.topicIdPartition())) {
+                        
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition());
+                        
inflightAlterPartition.future().completeExceptionally(errorResponses.get(inflightAlterPartition.topicIdPartition()).exception());
+                    } else {
+                        // Don't remove this partition from the update map so 
it will get re-sent
+                        log.warn("Partition {} was sent but not included in 
the response", inflightAlterPartition.topicIdPartition());
+                    }
+                });
+            }
+            default -> log.warn("Controller returned an unexpected top-level 
error when handling AlterPartition request: {}", error);
+        }
+
+        return error;
+    }
+
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
 
b/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
new file mode 100644
index 00000000000..e30a7702ab5
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/partition/AlterPartitionManagerTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.partition;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.util.MockScheduler;
+import org.apache.kafka.server.util.MockTime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AlterPartitionManagerTest {
+    private final Uuid topicId = Uuid.randomUuid();
+    private final MockTime time = new MockTime();
+    private final int brokerId = 1;
+
+    private NodeToControllerChannelManager brokerToController;
+
+    private final TopicIdPartition tp0 = new TopicIdPartition(topicId, 0);
+    private final TopicIdPartition tp1 = new TopicIdPartition(topicId, 1);
+    private final TopicIdPartition tp2 = new TopicIdPartition(topicId, 2);
+
+    @BeforeEach
+    public void setup() {
+        brokerToController = mock(NodeToControllerChannelManager.class);
+    }
+
+    @Test
+    public void testBasic() {
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+        alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1, 2, 
3), LeaderRecoveryState.RECOVERED, 10));
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(any(), any());
+    }
+
+    @Test
+    public void testBasicWithBrokerEpoch() {
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 
101L);
+        alterPartitionManager.start();
+        ArrayList<BrokerState> isrWithBrokerEpoch = new ArrayList<>();
+        for (int i = 1; i <= 3; i++) {
+            isrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(i).setBrokerEpoch(100 + i));
+        }
+        alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch, 10));
+
+        var expectedAlterPartitionData = new AlterPartitionRequestData()
+                .setBrokerId(brokerId)
+                .setBrokerEpoch(101);
+        var topicData = new 
AlterPartitionRequestData.TopicData().setTopicId(topicId);
+
+        ArrayList<BrokerState> newIsrWithBrokerEpoch = new ArrayList<>();
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(1).setBrokerEpoch(101));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(2).setBrokerEpoch(102));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(3).setBrokerEpoch(103));
+
+        topicData.partitions().add(new 
AlterPartitionRequestData.PartitionData()
+                .setPartitionIndex(0)
+                .setLeaderEpoch(1)
+                .setPartitionEpoch(10)
+                .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+        expectedAlterPartitionData.topics().add(topicData);
+
+        verify(brokerToController).start();
+        ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> captor 
= ArgumentCaptor.captor();
+        verify(brokerToController).sendRequest(captor.capture(), any());
+        AlterPartitionRequest.Builder builder = 
(AlterPartitionRequest.Builder) captor.getValue();
+        assertEquals(expectedAlterPartitionData, builder.build().data());
+    }
+
+    @ParameterizedTest
+    @EnumSource(LeaderRecoveryState.class)
+    public void testBasicSentLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+        ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> 
requestCapture = ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+        alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1), 
leaderRecoveryState, 10));
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(requestCapture.capture(), 
any());
+
+        var request = requestCapture.getValue().build();
+        assertEquals(leaderRecoveryState.value(), 
request.data().topics().get(0).partitions().get(0).leaderRecoveryState());
+    }
+
+    @Test
+    public void testOverwriteWithinBatch() {
+        ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> capture 
= ArgumentCaptor.captor();
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+
+        // Only send one ISR update for a given topic+partition
+        var firstSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+        assertFalse(firstSubmitFuture.isDone());
+
+        var failedSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List.of(1, 2), LeaderRecoveryState.RECOVERED, 10));
+        assertTrue(failedSubmitFuture.isCompletedExceptionally());
+        assertFutureThrows(OperationNotAttemptedException.class, 
failedSubmitFuture);
+
+        // Simulate response
+        var alterPartitionResp = partitionResponse(tp0, Errors.NONE, 0, 0, 0, 
List.of());
+        var resp = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion());
+
+        verify(brokerToController).sendRequest(capture.capture(), 
callbackCapture.capture());
+        callbackCapture.getValue().onComplete(resp);
+
+        // Now we can submit this partition again
+        var newSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List.of(1), LeaderRecoveryState.RECOVERED, 10));
+        assertFalse(newSubmitFuture.isDone());
+
+        verify(brokerToController).start();
+        verify(brokerToController, times(2)).sendRequest(capture.capture(), 
callbackCapture.capture());
+
+        // Make sure we sent the right request ISR={1}
+        var request = capture.getValue().build();
+        assertEquals(1, request.data().topics().size());
+        if (request.version() < 3) {
+            assertEquals(1, 
request.data().topics().get(0).partitions().get(0).newIsr().size());
+        } else {
+            assertEquals(1, 
request.data().topics().get(0).partitions().get(0).newIsrWithEpochs().size());
+        }
+    }
+
+    @Test
+    public void testSingleBatch() {
+        ArgumentCaptor<AbstractRequest.Builder<AlterPartitionRequest>> capture 
= ArgumentCaptor.captor();
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+
+        // First request will send batch of one
+        alterPartitionManager.submit(new TopicIdPartition(topicId, 0),
+                new LeaderAndIsr(1, 1, List.of(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10));
+
+        // Other submissions will queue up until a response
+        for (int i = 1; i <= 9; i++) {
+            alterPartitionManager.submit(new TopicIdPartition(topicId, i),
+                    new LeaderAndIsr(1, 1, List.of(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10));
+        }
+
+        // Simulate response, omitting partition 0 will allow it to stay in 
unsent queue
+        var alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData());
+        var resp = new ClientResponse(null, null, "", 0L, 0L,
+                false, null, null, alterPartitionResp);
+
+        // On the callback, we check for unsent items and send another request
+        verify(brokerToController).sendRequest(capture.capture(), 
callbackCapture.capture());
+        callbackCapture.getValue().onComplete(resp);
+
+        verify(brokerToController).start();
+        verify(brokerToController, times(2)).sendRequest(capture.capture(), 
callbackCapture.capture());
+
+        // Verify the last request sent had all 10 items
+        var request = capture.getValue().build();
+        assertEquals(1, request.data().topics().size());
+        assertEquals(10, request.data().topics().get(0).partitions().size());
+    }
+
+    @Test
+    public void testSubmitFromCallback() throws ExecutionException, 
InterruptedException, TimeoutException {
+        // prepare a partition level retriable error response
+        var alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0, List.of());
+        var errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion());
+
+        var leaderId = 1;
+        var leaderEpoch = 1;
+        var partitionEpoch = 10;
+        var isr = List.of(1, 2, 3);
+        var leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch, isr, 
LeaderRecoveryState.RECOVERED, partitionEpoch);
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+        var future = alterPartitionManager.submit(tp0, leaderAndIsr);
+        var finalFuture = new CompletableFuture<LeaderAndIsr>();
+        future.whenComplete((result, error) -> {
+            if (error != null) {
+                // Retry when error.
+                alterPartitionManager.submit(tp0, 
leaderAndIsr).whenComplete((retryResult, retryError) -> {
+                    if (retryError != null) {
+                        finalFuture.completeExceptionally(retryError);
+                    } else {
+                        finalFuture.complete(retryResult);
+                    }
+                });
+            } else {
+                finalFuture.completeExceptionally(new AssertionError("Expected 
to future to be failed"));
+            }
+        });
+
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+        reset(brokerToController);
+        callbackCapture.getValue().onComplete(errorResponse);
+
+        // Complete the retry request
+        var retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, 
partitionEpoch, leaderId, leaderEpoch, isr);
+        var retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion());
+
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+        callbackCapture.getValue().onComplete(retryResponse);
+
+        assertEquals(leaderAndIsr, finalFuture.get(200, 
TimeUnit.MILLISECONDS));
+        // No more items in unsentIsrUpdates
+        assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+
+    }
+
+    @Test
+    public void testAuthorizationFailed() {
+        testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED);
+    }
+
+    @Test
+    public void testStaleBrokerEpoch() {
+        testRetryOnTopLevelError(Errors.STALE_BROKER_EPOCH);
+    }
+
+    @Test
+    public void testUnknownServer() {
+        testRetryOnTopLevelError(Errors.UNKNOWN_SERVER_ERROR);
+    }
+
+    @Test
+    public void testRetryOnAuthenticationFailure() {
+        testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+                false, null, new AuthenticationException("authentication 
failed"), null));
+    }
+
+    @Test
+    public void testRetryOnUnsupportedVersionError() {
+        testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+                false, new UnsupportedVersionException("unsupported version"), 
null, null));
+    }
+
+    private void testRetryOnTopLevelError(Errors error) {
+        var alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData().setErrorCode(error.code()));
+        var response = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion());
+        testRetryOnErrorResponse(response);
+    }
+
+    private void testRetryOnErrorResponse(ClientResponse response) {
+        var leaderAndIsr = new LeaderAndIsr(1, 1, List.of(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10);
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+        alterPartitionManager.submit(tp0, leaderAndIsr);
+
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+        callbackCapture.getValue().onComplete(response);
+
+        // Any top-level error, we want to retry, so we don't clear items from 
the pending map
+        assertTrue(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+
+        reset(brokerToController);
+
+        // After some time, we will retry failed requests
+        time.sleep(100);
+        scheduler.tick();
+
+        // After a successful response, we can submit another AlterIsrItem
+        var retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, 
0, 0, 0, List.of());
+        var retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion());
+
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+        callbackCapture.getValue().onComplete(retryResponse);
+
+        assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0));
+    }
+
+    @Test
+    public void testInvalidUpdateVersion() {
+        checkPartitionError(Errors.INVALID_UPDATE_VERSION);
+    }
+
+    @Test
+    public void testUnknownTopicPartition() {
+        checkPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    }
+
+    @Test
+    public void testNotLeaderOrFollower() {
+        checkPartitionError(Errors.NOT_LEADER_OR_FOLLOWER);
+    }
+
+    @Test
+    public void testInvalidRequest() {
+        checkPartitionError(Errors.INVALID_REQUEST);
+    }
+
+    private void checkPartitionError(Errors error) {
+        var alterPartitionManager = testPartitionError(tp0, error);
+        var future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+        assertFalse(future.isDone());
+    }
+
+    private AlterPartitionManager testPartitionError(TopicIdPartition tp, 
Errors error) {
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+        reset(brokerToController);
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+
+        var future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, 
List.of(1, 2, 3), LeaderRecoveryState.RECOVERED, 10));
+
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+        reset(brokerToController);
+
+        var alterPartitionResp = partitionResponse(tp, error, 0, 0, 0, 
List.of());
+        var resp = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion());
+        callbackCapture.getValue().onComplete(resp);
+        assertTrue(future.isCompletedExceptionally());
+        assertFutureThrows(error.exception().getClass(), future);
+        return alterPartitionManager;
+    }
+
+    @Test
+    public void testOneInFlight() {
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+
+        // First submit will send the request
+        alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List.of(1, 2, 
3), LeaderRecoveryState.RECOVERED, 10));
+
+        // These will become pending unsent items
+        alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List.of(1, 2, 
3), LeaderRecoveryState.RECOVERED, 10));
+        alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List.of(1, 2, 
3), LeaderRecoveryState.RECOVERED, 10));
+
+        verify(brokerToController).start();
+        verify(brokerToController).sendRequest(any(), 
callbackCapture.capture());
+
+        // Once the callback runs, another request will be sent
+        reset(brokerToController);
+
+        var alterPartitionResp = new AlterPartitionResponse(new 
AlterPartitionResponseData());
+        var resp = makeClientResponse(alterPartitionResp, 
ApiKeys.ALTER_PARTITION.latestVersion());
+        callbackCapture.getValue().onComplete(resp);
+
+        // Verify that pending items (tp1, tp2) are sent after the callback
+        verify(brokerToController).sendRequest(any(), any());
+    }
+
+    @Test
+    public void testPartitionMissingInResponse() {
+        var expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion();
+        var leaderAndIsr = new LeaderAndIsr(1, 1, List.of(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10);
+        var scheduler = new MockScheduler(time);
+        var alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, brokerId, () -> 2L);
+        alterPartitionManager.start();
+
+        // The first `submit` will send the `AlterIsr` request
+        var future1 = alterPartitionManager.submit(tp0, leaderAndIsr);
+        var callback1 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
+                Set.of(tp0),
+                expectedVersion
+        ));
+
+        // Additional calls while the `AlterIsr` request is inflight will be 
queued
+        var future2 = alterPartitionManager.submit(tp1, leaderAndIsr);
+        var future3 = alterPartitionManager.submit(tp2, leaderAndIsr);
+
+        // Respond to the first request, which will also allow the next 
request to get sent
+        callback1.onComplete(makeClientResponse(
+                partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0, 
List.of()),
+                expectedVersion
+        ));
+        assertFutureThrows(UnknownServerException.class, future1);
+        assertFalse(future2.isDone());
+        assertFalse(future3.isDone());
+
+        // Verify the second request includes both expected partitions, but 
only respond with one of them
+        var callback2 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
+                Set.of(tp1, tp2),
+                expectedVersion
+        ));
+        callback2.onComplete(makeClientResponse(
+                partitionResponse(tp2, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0, 
List.of()),
+                expectedVersion
+        ));
+        assertFutureThrows(UnknownServerException.class, future3);
+        assertFalse(future2.isDone());
+
+        // The missing partition should be retried
+        var callback3 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
+                Set.of(tp1),
+                expectedVersion
+        ));
+        callback3.onComplete(makeClientResponse(
+                partitionResponse(tp1, Errors.UNKNOWN_SERVER_ERROR, 0, 0, 0, 
List.of()),
+                expectedVersion
+        ));
+        assertFutureThrows(UnknownServerException.class, future2);
+
+    }
+
+    private ControllerRequestCompletionHandler 
verifySendRequest(NodeToControllerChannelManager brokerToController,
+                                                                 
ArgumentMatcher<AbstractRequest.Builder<? extends AbstractRequest>> 
expectedRequest) {
+        ArgumentCaptor<ControllerRequestCompletionHandler> callbackCapture = 
ArgumentCaptor.captor();
+
+        verify(brokerToController).sendRequest(argThat(expectedRequest), 
callbackCapture.capture());
+
+        reset(brokerToController);
+
+        return callbackCapture.getValue();
+    }
+
+    private ArgumentMatcher<AbstractRequest.Builder<? extends 
AbstractRequest>> alterPartitionRequestMatcher(Set<TopicIdPartition> 
expectedTopicPartitions,
+                                                                               
                              Short expectedVersion) {
+        return request -> {
+            assertEquals(ApiKeys.ALTER_PARTITION, request.apiKey());
+
+            var alterPartitionRequest = ((AlterPartitionRequest.Builder) 
request).build();
+            assertEquals(expectedVersion, alterPartitionRequest.version());
+
+            var requestTopicPartitions = 
alterPartitionRequest.data().topics().stream()
+                    .flatMap(topicData ->
+                            topicData.partitions().stream()
+                                    .map(partitionData ->
+                                            new 
TopicPartitionKey(topicData.topicId(), partitionData.partitionIndex()))
+                    ).collect(Collectors.toSet());
+
+            var expectedSet = expectedTopicPartitions.stream()
+                    .map(tp -> new TopicPartitionKey(tp.topicId(), 
tp.partitionId()))
+                    .collect(Collectors.toSet());
+
+            return expectedSet.equals(requestTopicPartitions);
+        };
+    }
+
+    record TopicPartitionKey(Uuid topicId, int partitionId) { }
+
+    private ClientResponse makeClientResponse(AlterPartitionResponse response, 
short version) {
+        return new ClientResponse(
+                new RequestHeader(response.apiKey(), version, "", 0),
+                null,
+                "",
+                0L,
+                0L,
+                false,
+                null,
+                null,
+                // Response is serialized and deserialized to ensure that its 
does
+                // not contain ignorable fields used by other versions.
+                
AlterPartitionResponse.parse(MessageUtil.toByteBufferAccessor(response.data(), 
version), version)
+        );
+    }
+
+    private AlterPartitionResponse partitionResponse(
+            TopicIdPartition tp,
+            Errors error,
+            int partitionEpoch,
+            int leaderId,
+            int leaderEpoch,
+            List<Integer> isr) {
+        return new AlterPartitionResponse(new AlterPartitionResponseData()
+                .setTopics(List.of(new AlterPartitionResponseData.TopicData()
+                        .setTopicId(tp.topicId())
+                        .setPartitions(List.of(new 
AlterPartitionResponseData.PartitionData()
+                                .setPartitionIndex(tp.partitionId())
+                                .setPartitionEpoch(partitionEpoch)
+                                .setLeaderEpoch(leaderEpoch)
+                                .setLeaderId(leaderId)
+                                .setIsr(isr)
+                                .setErrorCode(error.code()))))));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
 
b/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
new file mode 100644
index 00000000000..a4d4d07697d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/util/MockAlterPartitionManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.errors.OperationNotAttemptedException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.apache.kafka.server.partition.AlterPartitionItem;
+import org.apache.kafka.server.partition.AlterPartitionManager;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class MockAlterPartitionManager implements AlterPartitionManager {
+    // Visible for testing
+    public final Deque<AlterPartitionItem> isrUpdates = new ArrayDeque<>();
+    private final AtomicBoolean inFlight = new AtomicBoolean(false);
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+
+    }
+
+    @Override
+    public CompletableFuture<LeaderAndIsr> submit(TopicIdPartition 
topicIdPartition, LeaderAndIsr leaderAndIsr) {
+        var future = new CompletableFuture<LeaderAndIsr>();
+        if (inFlight.compareAndSet(false, true)) {
+            isrUpdates.add(new AlterPartitionItem(topicIdPartition, 
leaderAndIsr, future));
+        } else {
+            future.completeExceptionally(new OperationNotAttemptedException(
+                    String.format("Failed to enqueue AlterIsr request for %s 
since there is already an inflight request",
+                            topicIdPartition)
+            ));
+        }
+        return future;
+    }
+
+    public void completeIsrUpdate(int newPartitionEpoch) {
+        if (inFlight.compareAndSet(true, false)) {
+            var item = isrUpdates.poll();
+            
item.future().complete(item.leaderAndIsr().withPartitionEpoch(newPartitionEpoch));
+        } else {
+            fail("Expected an in-flight ISR update, but there was none");
+        }
+    }
+
+    public void failIsrUpdate(Errors error) {
+        if (inFlight.compareAndSet(true, false)) {
+            var item = isrUpdates.poll();
+            item.future().completeExceptionally(error.exception());
+        } else {
+            fail("Expected an in-flight ISR update, but there was none");
+        }
+    }
+}

Reply via email to