sajjad-moradi commented on code in PR #15177:
URL: https://github.com/apache/pinot/pull/15177#discussion_r1980529478


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +566,38 @@ static List<SegmentZKMetadata> 
filterSegmentsBasedOnStatus(TableType tableType,
       //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
       //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
       //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      Map<Integer, LLCSegmentName> latestCompletedSegmentInEachPartition = new 
HashMap<>();
+      HashSet<String> filteredSegmentNames = new HashSet<>();
       for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        if (!segmentZKMetadata.getStatus().isCompleted()
-            && segmentZKMetadata.getTotalDocs() > 0
-            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
-          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
+        if (segmentZKMetadata.getStatus().isCompleted()) {
+          // completed segments
+          if (LLCSegmentName.isLLCSegment(segmentZKMetadata.getSegmentName())) 
{
+            // realtime segments
+            LLCSegmentName llcSegmentName = new 
LLCSegmentName(segmentZKMetadata.getSegmentName());
+            int partitionId = llcSegmentName.getPartitionGroupId();
+            if 
(!latestCompletedSegmentInEachPartition.containsKey(partitionId)) {
+              // current segment is the latest found
+              
latestCompletedSegmentInEachPartition.put(llcSegmentName.getPartitionGroupId(), 
llcSegmentName);
+            } else {
+              if (llcSegmentName.getSequenceNumber() >
+                      
latestCompletedSegmentInEachPartition.get(partitionId).getSequenceNumber()) {
+                // current segment is the latest found
+                
filteredSegmentNames.add(latestCompletedSegmentInEachPartition.get(partitionId).getSegmentName());
+                latestCompletedSegmentInEachPartition.put(partitionId, 
llcSegmentName);
+              } else {
+                // current segment is not the latest
+                filteredSegmentNames.add(llcSegmentName.getSegmentName());
+              }
+            }
+          } else {
+            // other segments: merged segments, uploaded segments, or ingested 
offline segments
+            filteredSegmentNames.add(segmentZKMetadata.getSegmentName());
+          }
         }
       }
-      final long finalEarliestStartTimeMsOfInProgressSegments = 
earliestStartTimeMsOfInProgressSegments;
       return allSegments.stream()
-          .filter(segmentZKMetadata -> 
segmentZKMetadata.getStatus().isCompleted()
-              && segmentZKMetadata.getStartTimeMs() < 
finalEarliestStartTimeMsOfInProgressSegments)
-          .collect(Collectors.toList());
+              .filter(a->filteredSegmentNames.contains(a.getSegmentName()))
+              .collect(Collectors.toList());

Review Comment:
   We can make this code simpler for read/review by only calculating 
`partitionIdToLatestCompletedSegment`:
   ```java
         if (segmentZKMetadata.getStatus().isCompleted()) {
           String segmentName = segmentZKMetadata.getSegmentName();
           if (LLCSegmentName.isLLCSegment(segmentName)) {
             LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
             
partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(),
 (partId, latestSegment) -> {
               if (latestSegment == null) {
                 return llcSegmentName;
               } else {
                 return latestSegment.getSequenceNumber() > 
llcSegmentName.getSequenceNumber() ? latestSegment
                     : llcSegmentName;
               }
             });
           } 
         }
   ```
   Then at the end, we when we go over elements of `allSegments` and we filter 
out latest completed segments.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to