sajjad-moradi commented on code in PR #15177: URL: https://github.com/apache/pinot/pull/15177#discussion_r1980529478
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java: ########## @@ -565,19 +566,38 @@ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those // records correctly. - long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE; + Map<Integer, LLCSegmentName> latestCompletedSegmentInEachPartition = new HashMap<>(); + HashSet<String> filteredSegmentNames = new HashSet<>(); for (SegmentZKMetadata segmentZKMetadata : allSegments) { - if (!segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getTotalDocs() > 0 - && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) { - earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs(); + if (segmentZKMetadata.getStatus().isCompleted()) { + // completed segments + if (LLCSegmentName.isLLCSegment(segmentZKMetadata.getSegmentName())) { + // realtime segments + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName()); + int partitionId = llcSegmentName.getPartitionGroupId(); + if (!latestCompletedSegmentInEachPartition.containsKey(partitionId)) { + // current segment is the latest found + latestCompletedSegmentInEachPartition.put(llcSegmentName.getPartitionGroupId(), llcSegmentName); + } else { + if (llcSegmentName.getSequenceNumber() > + latestCompletedSegmentInEachPartition.get(partitionId).getSequenceNumber()) { + // current segment is the latest found + filteredSegmentNames.add(latestCompletedSegmentInEachPartition.get(partitionId).getSegmentName()); + latestCompletedSegmentInEachPartition.put(partitionId, llcSegmentName); + } else { + // current segment is not the latest + filteredSegmentNames.add(llcSegmentName.getSegmentName()); + } + } + } else { + // other segments: merged segments, uploaded segments, or ingested offline segments + filteredSegmentNames.add(segmentZKMetadata.getSegmentName()); + } } } - final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments; return allSegments.stream() - .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted() - && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments) - .collect(Collectors.toList()); + .filter(a->filteredSegmentNames.contains(a.getSegmentName())) + .collect(Collectors.toList()); Review Comment: We can make this code simpler for read/review by only calculating `partitionIdToLatestCompletedSegment`: ```java if (segmentZKMetadata.getStatus().isCompleted()) { String segmentName = segmentZKMetadata.getSegmentName(); if (LLCSegmentName.isLLCSegment(segmentName)) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(), (partId, latestSegment) -> { if (latestSegment == null) { return llcSegmentName; } else { return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? latestSegment : llcSegmentName; } }); } } ``` Then at the end, we when we go over elements of `allSegments` and we filter out latest completed segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org