chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1609423094
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
partition.log.foreach { _ =>
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
- // Add future replica log to partition's map
- partition.createLogIfNotExists(
- isNew = false,
- isFutureReplica = true,
- offsetCheckpoints,
- topicIds(partition.topic))
-
- // pause cleaning for partitions that are being moved and start
ReplicaAlterDirThread to move
- // replica from source dir to destination dir
- logManager.abortAndPauseCleaning(topicPartition)
+ // Add future replica log to partition's map if it's not existed
Review Comment:
oh, my previous comments is incorrect. Both `alterReplicaLogDirs` and
`maybeAddLogDirFetchers` are in `replicaStateChangeLock`, so the race condition
I described should not happen.
However, I'm thinking whether it is fine to add alter thread by
`maybeAddLogDirFetchers` even though the future log of partition is already
created by another thread. Although no new alter thread will be created as
`BrokerIdAndFetcherId` is identical.
In short, `alterReplicaLogDirs` adds alter thread [0] only if it succeeds to
create future log of partition. Maybe `maybeAddLogDirFetchers` should follow
same rule? Or we can add comments to say "that is fine as
`replicaAlterLogDirsManager.addFetcherForPartitions` will be a no-op in this
case?
[0]
https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/server/ReplicaManager.scala#L1198
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]