soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1557584496
##########
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##########
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
}
override def removePartitions(topicPartitions: Set[TopicPartition]):
Map[TopicPartition, PartitionFetchState] = {
- // Schedule assignment request to revert any queued request before
cancelling
- for {
- topicPartition <- topicPartitions
- partitionState <- partitionAssignmentRequestState(topicPartition)
- if partitionState == QUEUED
- partition = replicaMgr.getPartitionOrException(topicPartition)
- topicId <- partition.topicId
- directoryId <- partition.logDirectoryId()
- topicIdPartition = new TopicIdPartition(topicId,
topicPartition.partition())
- } directoryEventHandler.handleAssignment(topicIdPartition, directoryId, ()
=> ())
+ for (topicPartition <- topicPartitions) {
+ if (this.promotionStates.containsKey(topicPartition)) {
+ val PromotionState(reassignmentState, topicId, originalDir) =
this.promotionStates.get(topicPartition)
+ // Revert any reassignments for partitions that did not complete the
future replica promotion
+ if (originalDir.isDefined && topicId.isDefined &&
reassignmentState.maybeInconsistentMetadata) {
+ directoryEventHandler.handleAssignment(new
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, ()
=> ())
+ }
+ this.promotionStates.remove(topicPartition)
+ }
Review Comment:
Great question.
> in what circumstance, the promotionStates will not contain the topic
partition?
`promotionStates` will not contain the partition if it's either removed
twice, or removed before without being added before. The
`addPartitions/removePartitions` semantics in `ReplicaAlterLogDirsThread` seem
to have been intended to tolerate this behavior. Have a look at
`handleLogDirFailure` in `ReplicaManager`, when a directory fails, all
partitions in the failed directory end up being passed to `removePartitions`
(via `removeFetcherForPartitions`), regardless of whether they were ever added.
So `removePartitions` needs to be idempotent, and tolerate unknown partitions.
> before this change, we'll send out AssignReplicasToDirsRequest no matter
what, but now, we will skip when no PromotionState
No, I think the behavior here is the same before this change, in which
`partitionState <- partitionAssignmentRequestState(topicPartition)` runs inside
the Scala `for` comprehension, and if
`partitionAssignmentRequestState(topicPartition)` results in `None` then the
execution stops. That's equivalent to an `if` gate on
`this.promotionStates.containsKey(topicPartition)`.
> Should we still invoke directoryEventHandler.handleAssignment even if no
PromotionState? I think no, but I'd like to hear your thought here.
I agree we should not invoke `directoryEventHandler.handleAssignment` if
there is no `PromotionState`, because it's only created if `addPartitions` is
called for the partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]