kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1699454790
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -507,17 +525,25 @@ public void stopPartitions(Set<StopPartition>
stopPartitions,
LOGGER.error("Error while stopping the partition: {}",
stopPartition, ex);
}
}
- // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is
true but not the other way around.
Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
.filter(sp -> sp.deleteLocalLog() &&
topicIdByPartitionMap.containsKey(sp.topicPartition()))
.map(sp -> new
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()),
sp.topicPartition()))
.collect(Collectors.toSet());
+
if (!deleteLocalPartitions.isEmpty()) {
- // NOTE: In ZK mode, this#stopPartitions method is called when
Replica state changes to Offline and
- // ReplicaDeletionStarted
- remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
deleteLocalPartitions.forEach(tpId ->
topicIdByPartitionMap.remove(tpId.topicPartition()));
}
+
+ // NOTE: In ZK mode, this#stopPartitions method is called when Replica
state changes to Offline and
+ // ReplicaDeletionStarted
+ Set<TopicIdPartition> stopRLMMPartitions = stopPartitions.stream()
Review Comment:
can we merge L539 and L528 together?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -437,30 +437,48 @@ public void onLeadershipChange(Set<Partition>
partitionsBecomeLeader,
throw new KafkaException("RemoteLogManager is not configured when
remote storage system is enabled");
}
- Set<TopicIdPartition> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteCopyDisabled())));
- Set<TopicIdPartition> followerPartitions =
filterPartitions(partitionsBecomeFollower)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> followerPartitions =
filterPartitions(partitionsBecomeFollower)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteCopyDisabled())));
if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
LOGGER.debug("Effective topic partitions after filtering compact
and internal topics, leaders: {} and followers: {}",
leaderPartitions, followerPartitions);
- leaderPartitions.forEach(this::cacheTopicPartitionIds);
- followerPartitions.forEach(this::cacheTopicPartitionIds);
+ leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+ followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
-
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions,
followerPartitions);
- followerPartitions.forEach(this::doHandleFollowerPartition);
+
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
followerPartitions.keySet());
+ followerPartitions.forEach((tp, __) ->
doHandleFollowerPartition(tp));
// If this node was the previous leader for the partition, then
the RLMTask might be running in the
// background thread and might emit metrics. So, removing the
metrics after marking this node as follower.
-
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+ followerPartitions.forEach((tp, __) ->
removeRemoteTopicPartitionMetrics(tp));
leaderPartitions.forEach(this::doHandleLeaderPartition);
}
}
+ public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+ for (Partition partition : partitions) {
+ TopicPartition tp = partition.topicPartition();
+ if (topicIdByPartitionMap.containsKey(tp)) {
+ TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+ leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
+ LOGGER.info("Resetting remote copy lag metrics for tpId:
{}", tpId);
+ ((RLMCopyTask) task.rlmTask).recordLagStats(0L, 0L);
Review Comment:
we have to handle concurrency. The RLMCopy thread might overwrite the value
after we reset the value to zero. Can we cancel the task first, then reset the
lag to zero?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]