soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1557584496


##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-    // Schedule assignment request to revert any queued request before 
cancelling
-    for {
-      topicPartition <- topicPartitions
-      partitionState <- partitionAssignmentRequestState(topicPartition)
-      if partitionState == QUEUED
-      partition = replicaMgr.getPartitionOrException(topicPartition)
-      topicId <- partition.topicId
-      directoryId <- partition.logDirectoryId()
-      topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-    } directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+    for (topicPartition <- topicPartitions) {
+      if (this.promotionStates.containsKey(topicPartition)) {
+        val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+        // Revert any reassignments for partitions that did not complete the 
future replica promotion
+        if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+          directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+        }
+        this.promotionStates.remove(topicPartition)
+      }

Review Comment:
   Great question.
   
   > in what circumstance, the promotionStates will not contain the topic 
partition?
   
   `promotionStates` will not contain the partition if it's either removed 
twice, or removed before without being added before. The 
`addPartitions/removePartitions` semantics in `ReplicaAlterLogDirsThread` seem 
to have been intended to tolerate this behavior. Have a look at 
`handleLogDirFailure` in `ReplicaManager`, when a directory fails, all 
partitions in the failed directory end up being passed to `removePartitions` 
(via `removeFetcherForPartitions`), regardless of whether they were ever added. 
So `removePartitions` needs to be idempotent, and tolerate unknown partitions.
   
   > before this change, we'll send out AssignReplicasToDirsRequest no matter 
what, but now, we will skip when no PromotionState
   
   No, I think the behavior here is the same before this change, in which 
`partitionState <- partitionAssignmentRequestState(topicPartition)` runs inside 
the Scala `for` comprehension, and if 
`partitionAssignmentRequestState(topicPartition)` results in `None` then the 
execution stops. That's equivalent to an `if` gate on 
`this.promotionStates.containsKey(topicPartition)`.
   
   > Should we still invoke directoryEventHandler.handleAssignment even if no 
PromotionState? I think no, but I'd like to hear your thought here.
   
   I agree we should not invoke `directoryEventHandler.handleAssignment` if 
there is no  `PromotionState`, because it's only created if `addPartitions` is 
called for the partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to