jsancio commented on code in PR #19762:
URL: https://github.com/apache/kafka/pull/19762#discussion_r2190622591
##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
public String toString() {
return new TopicsImageByNameNode(this).stringify();
}
+
+ /**
+ * Returns true if the given topic partition should not be on the current
broker according to the metadata image.
+ *
+ * @param newTopicsImage The new topics image after broker has been
reloaded
+ * @param brokerId The ID of the current broker.
+ * @param topicId The topic ID
+ * @param partitionId The partition ID
+ * @param log The log
+ * @return true if the topic partition should not exist on the broker,
false otherwise.
+ */
+ public static boolean isStrayReplica(TopicsImage newTopicsImage, int
brokerId, Optional<Uuid> topicId, int partitionId, String log) {
Review Comment:
Why make this static if the first argument is `TopicsImage`? This looks a
lot like an object method where the `this` reference is the `TopicsImage`.
##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
public String toString() {
return new TopicsImageByNameNode(this).stringify();
}
+
+ /**
+ * Returns true if the given topic partition should not be on the current
broker according to the metadata image.
+ *
+ * @param newTopicsImage The new topics image after broker has been
reloaded
+ * @param brokerId The ID of the current broker.
+ * @param topicId The topic ID
+ * @param partitionId The partition ID
+ * @param log The log
+ * @return true if the topic partition should not exist on the broker,
false otherwise.
+ */
+ public static boolean isStrayReplica(TopicsImage newTopicsImage, int
brokerId, Optional<Uuid> topicId, int partitionId, String log) {
+ if (topicId.isEmpty()) {
+ // Missing topic ID could result from storage failure or unclean
shutdown after topic creation but before flushing
+ // data to the `partition.metadata` file. And before appending
data to the log, the `partition.metadata` is always
+ // flushed to disk. So if the topic ID is missing, it mostly means
no data was appended, and we can treat this as
+ // a stray log.
+ LOG.info("The topicId does not exist in {}, treat it as a stray
log.", log);
+ return true;
+ }
+
+ PartitionRegistration partition =
newTopicsImage.getPartition(topicId.get(), partitionId);
+ if (partition == null) {
+ LOG.info("Found stray log dir {}: the topicId {} does not exist in
the metadata image.", log, topicId);
+ return true;
+ } else {
+ List<Integer> replicas =
Arrays.stream(partition.replicas).boxed().toList();
+ if (!replicas.contains(brokerId)) {
+ LOG.info("Found stray log dir {}: the current replica
assignment {} does not contain the local brokerId {}.",
+ log,
replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[",
"]")), brokerId);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
Review Comment:
To me this should be a functionality of the log manager. Maybe the
TopicsImage method should just return all of the topic ids that don't exist in
the given image and broker.
This would allow you to remove that added static logger.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java:
##########
Review Comment:
I am thinking that we should make this internal by moving the implementation
to `o.a.k.r.internals.KafkaRaftLog.java`. The same of the accompanying test
suite file.
##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -119,4 +127,40 @@ public Map<Uuid, String> topicIdToNameView() {
public String toString() {
return new TopicsImageByNameNode(this).stringify();
}
+
+ /**
+ * Returns true if the given topic partition should not be on the current
broker according to the metadata image.
+ *
+ * @param newTopicsImage The new topics image after broker has been
reloaded
+ * @param brokerId The ID of the current broker.
+ * @param topicId The topic ID
+ * @param partitionId The partition ID
+ * @param log The log
+ * @return true if the topic partition should not exist on the broker,
false otherwise.
+ */
+ public static boolean isStrayReplica(TopicsImage newTopicsImage, int
brokerId, Optional<Uuid> topicId, int partitionId, String log) {
+ if (topicId.isEmpty()) {
+ // Missing topic ID could result from storage failure or unclean
shutdown after topic creation but before flushing
+ // data to the `partition.metadata` file. And before appending
data to the log, the `partition.metadata` is always
+ // flushed to disk. So if the topic ID is missing, it mostly means
no data was appended, and we can treat this as
+ // a stray log.
+ LOG.info("The topicId does not exist in {}, treat it as a stray
log.", log);
+ return true;
+ }
Review Comment:
How about letting the log manager make this decision? In Kafka 4.0 all
topics must have a topic id.
##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -340,7 +339,7 @@ class BrokerMetadataPublisher(
// recovery-from-unclean-shutdown if required.
logManager.startup(
metadataCache.getAllTopics().asScala,
- isStray = log => JLogManager.isStrayKraftReplica(brokerId,
newImage.topics(), log)
+ isStray = log => TopicsImage.isStrayReplica(newImage.topics(),
brokerId, log.topicId(), log.topicPartition().partition(), log.toString)
Review Comment:
Minor but to me stray partition are in the log manager not in the topics
image. Meaning the log manager has partition entries that are not in the latest
topics image.
In some sense the log manager understand topics image and makes sure that
they match. The topics images doesn't know anything about "stray partitions"
and the log manager.
If you still want to move the functionality TopicsImage maybe make it a
method (not static) with `Stream<TopicIdPartition>
deletedPartitionsForReplica(int brokerId, Stream<TopicIdPartition>)`.
##########
checkstyle/import-control.xml:
##########
@@ -486,6 +486,7 @@
<allow class="org.apache.kafka.common.compress.Compression"
exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
+ <allow pkg="org.apache.kafka.common.internals" />
Review Comment:
Note that to me this means that the common module is not organized correctly
if the raft module needs types in the "internals" namespace. Same comment
applies to the change below which also includes `o.a.k.s.internals.log`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]