jtao15 commented on code in PR #15177: URL: https://github.com/apache/pinot/pull/15177#discussion_r1980338451
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -565,19 +565,46 @@ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; + // + // Based on the following considerations: Review Comment: (nit) let's remove/update the comments on L549-L551. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -565,19 +565,46 @@ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; + // + // Based on the following considerations: + // 1. The BufferTime configuration will do the work of NOT merging the most recent segments, it will cover most + // of the cases + // 2. If one wants to merge the most recent segments, and hence changes the BufferTime to 0, we need to make sure + // we do NOT merge the consuming segments + // 3. There is a corner case of PauseConsumption, the function will seal the consuming segments and NOT create + // a new one, nor upload the offset metadata. + // We decide to ONLY filter out the consuming segments and most recent completed segments for each partition. + Map<Integer, SegmentZKMetadata> latestCompletedSegmentInEachPartition = new HashMap<>(); + List<SegmentZKMetadata> filteredSegments = new ArrayList<>(); for (SegmentZKMetadata segmentZKMetadata : allSegments) { - if (!segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getTotalDocs() > 0 - && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { - earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); + if (segmentZKMetadata.getStatus().isCompleted()) { + // completed segments + String[] segmentIdComponents = segmentZKMetadata.getSegmentName().split("__"); Review Comment: Better to leverage LLCSegmentName.isLLCSegment() and getPartitionGroupId(). ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -565,19 +565,46 @@ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; + // + // Based on the following considerations: + // 1. The BufferTime configuration will do the work of NOT merging the most recent segments, it will cover most + // of the cases + // 2. If one wants to merge the most recent segments, and hence changes the BufferTime to 0, we need to make sure + // we do NOT merge the consuming segments + // 3. There is a corner case of PauseConsumption, the function will seal the consuming segments and NOT create + // a new one, nor upload the offset metadata. + // We decide to ONLY filter out the consuming segments and most recent completed segments for each partition. + Map<Integer, SegmentZKMetadata> latestCompletedSegmentInEachPartition = new HashMap<>(); + List<SegmentZKMetadata> filteredSegments = new ArrayList<>(); for (SegmentZKMetadata segmentZKMetadata : allSegments) { - if (!segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getTotalDocs() > 0 - && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { - earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); + if (segmentZKMetadata.getStatus().isCompleted()) { + // completed segments + String[] segmentIdComponents = segmentZKMetadata.getSegmentName().split("__"); + if (segmentIdComponents.length > 1) { + // realtime segments + int partitionId = Integer.parseInt(segmentIdComponents[1]); + if (!latestCompletedSegmentInEachPartition.containsKey(partitionId)) { + // latest + latestCompletedSegmentInEachPartition.put(partitionId, segmentZKMetadata); + } else { + long currentOffset = Long.parseLong(segmentZKMetadata.getEndOffset()); + long maxOffset = Long.parseLong(latestCompletedSegmentInEachPartition.get(partitionId).getEndOffset()); + if (currentOffset > maxOffset) { + // latest + filteredSegments.add(latestCompletedSegmentInEachPartition.get(partitionId)); + latestCompletedSegmentInEachPartition.put(partitionId, segmentZKMetadata); + } else { + // not latest + filteredSegments.add(segmentZKMetadata); + } + } + } else { + // not-realtime segments Review Comment: (nit) In this case, they are still realtime segments, but either uploaded or merged/rolluped. Maybe update the comment to make it more clear? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org