kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1699454790


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -507,17 +525,25 @@ public void stopPartitions(Set<StopPartition> 
stopPartitions,
                 LOGGER.error("Error while stopping the partition: {}", 
stopPartition, ex);
             }
         }
-        // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is 
true but not the other way around.
         Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
                 .filter(sp -> sp.deleteLocalLog() && 
topicIdByPartitionMap.containsKey(sp.topicPartition()))
                 .map(sp -> new 
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), 
sp.topicPartition()))
                 .collect(Collectors.toSet());
+
         if (!deleteLocalPartitions.isEmpty()) {
-            // NOTE: In ZK mode, this#stopPartitions method is called when 
Replica state changes to Offline and
-            // ReplicaDeletionStarted
-            remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
             deleteLocalPartitions.forEach(tpId -> 
topicIdByPartitionMap.remove(tpId.topicPartition()));
         }
+
+        // NOTE: In ZK mode, this#stopPartitions method is called when Replica 
state changes to Offline and
+        // ReplicaDeletionStarted
+        Set<TopicIdPartition> stopRLMMPartitions = stopPartitions.stream()

Review Comment:
   can we merge L539 and L528 together?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -437,30 +437,48 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
-        Set<TopicIdPartition> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteCopyDisabled())));
 
-        Set<TopicIdPartition> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteCopyDisabled())));
 
         if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
             LOGGER.debug("Effective topic partitions after filtering compact 
and internal topics, leaders: {} and followers: {}",
                     leaderPartitions, followerPartitions);
 
-            leaderPartitions.forEach(this::cacheTopicPartitionIds);
-            followerPartitions.forEach(this::cacheTopicPartitionIds);
+            leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+            followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
 
-            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
-            followerPartitions.forEach(this::doHandleFollowerPartition);
+            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
 followerPartitions.keySet());
+            followerPartitions.forEach((tp, __) -> 
doHandleFollowerPartition(tp));
 
             // If this node was the previous leader for the partition, then 
the RLMTask might be running in the
             // background thread and might emit metrics. So, removing the 
metrics after marking this node as follower.
-            
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+            followerPartitions.forEach((tp, __) -> 
removeRemoteTopicPartitionMetrics(tp));
 
             leaderPartitions.forEach(this::doHandleLeaderPartition);
         }
     }
 
+    public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+        for (Partition partition : partitions) {
+            TopicPartition tp = partition.topicPartition();
+            if (topicIdByPartitionMap.containsKey(tp)) {
+                TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+                leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
+                    LOGGER.info("Resetting remote copy lag metrics for tpId: 
{}", tpId);
+                    ((RLMCopyTask) task.rlmTask).recordLagStats(0L, 0L);

Review Comment:
   we have to handle concurrency. The RLMCopy thread might overwrite the value 
after we reset the value to zero. Can we cancel the task first, then reset the 
lag to zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to