kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1699561489
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -437,30 +437,48 @@ public void onLeadershipChange(Set<Partition>
partitionsBecomeLeader,
throw new KafkaException("RemoteLogManager is not configured when
remote storage system is enabled");
}
- Set<TopicIdPartition> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteCopyDisabled())));
- Set<TopicIdPartition> followerPartitions =
filterPartitions(partitionsBecomeFollower)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> followerPartitions =
filterPartitions(partitionsBecomeFollower)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteCopyDisabled())));
if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
LOGGER.debug("Effective topic partitions after filtering compact
and internal topics, leaders: {} and followers: {}",
leaderPartitions, followerPartitions);
- leaderPartitions.forEach(this::cacheTopicPartitionIds);
- followerPartitions.forEach(this::cacheTopicPartitionIds);
+ leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+ followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
-
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions,
followerPartitions);
- followerPartitions.forEach(this::doHandleFollowerPartition);
+
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
followerPartitions.keySet());
+ followerPartitions.forEach((tp, __) ->
doHandleFollowerPartition(tp));
// If this node was the previous leader for the partition, then
the RLMTask might be running in the
// background thread and might emit metrics. So, removing the
metrics after marking this node as follower.
-
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+ followerPartitions.forEach((tp, __) ->
removeRemoteTopicPartitionMetrics(tp));
leaderPartitions.forEach(this::doHandleLeaderPartition);
}
}
+ public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+ for (Partition partition : partitions) {
+ TopicPartition tp = partition.topicPartition();
+ if (topicIdByPartitionMap.containsKey(tp)) {
+ TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+ leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
+ LOGGER.info("Cancelling the copy RLM task for tpId: {}",
tpId);
+ task.cancel();
+ LOGGER.info("Resetting remote copy lag metrics for tpId:
{}", tpId);
+ ((RLMCopyTask) task.rlmTask).recordLagStats(0L, 0L);
Review Comment:
we have to refactor the `((RLMCopyTask) task.rlmTask).recordLagStats`
method, internally it checks for task cancellation status before emitting the
metric. We have to move the `isCancelled` check outside of
RLMCopyTask#recordLagStats method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]