showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546078129


##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId           The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDir        The original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+    def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+      PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+    /**
+     * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+     *         directory where the main replica is hosted.
+     */
+    def inconsistentMetadata: Boolean = false

Review Comment:
   If it's "maybe" inconsistent, should we make it clear in the variable name?



##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId           The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDir        The original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+    def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+      PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+    /**
+     * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+     *         directory where the main replica is hosted.
+     */
+    def inconsistentMetadata: Boolean = false
+  }
+
+  object ReassignmentState {
 
-  case object QUEUED extends DirectoryEventRequestState
+    /**
+     * The request has not been created.
+     */
+    case object None extends ReassignmentState
 
-  case object COMPLETED extends DirectoryEventRequestState
+    /**
+     * The request has been queued, it may or may not yet have been sent to 
the Controller.
+     */
+    case object Queued extends ReassignmentState{
+      override def inconsistentMetadata: Boolean = true
+    }
+
+    /**
+     * The controller has acknowledged the new directory assignment and 
persisted the change in metadata.
+     */
+    case object Accepted extends ReassignmentState{

Review Comment:
   ditto



##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId           The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDir        The original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+    def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+      PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+    /**
+     * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+     *         directory where the main replica is hosted.
+     */
+    def inconsistentMetadata: Boolean = false
+  }
+
+  object ReassignmentState {
 
-  case object QUEUED extends DirectoryEventRequestState
+    /**
+     * The request has not been created.
+     */
+    case object None extends ReassignmentState
 
-  case object COMPLETED extends DirectoryEventRequestState
+    /**
+     * The request has been queued, it may or may not yet have been sent to 
the Controller.
+     */
+    case object Queued extends ReassignmentState{

Review Comment:
   space after `ReassignmentState` 



##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-    // Schedule assignment request to revert any queued request before 
cancelling
-    for {
-      topicPartition <- topicPartitions
-      partitionState <- partitionAssignmentRequestState(topicPartition)
-      if partitionState == QUEUED
-      partition = replicaMgr.getPartitionOrException(topicPartition)
-      topicId <- partition.topicId
-      directoryId <- partition.logDirectoryId()
-      topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-    } directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+    for (topicPartition <- topicPartitions) {
+      // Revert any reassignments for partitions that did not complete the 
future replica promotion
+      val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+      if (reassignmentState.inconsistentMetadata) {
+        directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())

Review Comment:
   From 
[KIP-858](https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft#KIP858:HandleJBODbrokerdiskfailureinKRaft-Intra-brokerreplicamovement):
   
   > In this exceptional case, the broker issues a AssignReplicasToDirs RPC to 
the Controller to assignment the replica to UUID.LOST_DIR - this lets the 
Controller know that it needs to update leadership and ISR for this partition 
too.
   
   But it looks like we only revert the assignment back to the source DIR. 
Questions:
   1. Will the controller do leader election for this partition in this case?
   2. If so, should we update KIP? If not, should we follow the KIP?
   3. We might need to add tests to make sure leader is re-elected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to