sajjad-moradi commented on code in PR #15177:
URL: https://github.com/apache/pinot/pull/15177#discussion_r1983771523


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -543,44 +545,45 @@ public void validateTaskConfigs(TableConfig tableConfig, 
Schema schema, Map<Stri
   }
 
   @VisibleForTesting
-  static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType 
tableType, List<SegmentZKMetadata> allSegments) {
-    if (tableType == TableType.REALTIME) {
-      // For realtime table, don't process
-      // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
-      // 2. sealed segments with start time later than the earliest start time 
of all in progress segments
-      // This prevents those in-progress segments from not being merged.
-      //
-      // Note that we make the following two assumptions here:
-      // 1. streaming data consumer lags are negligible
-      // 2. streaming data records are ingested mostly in chronological order 
(no records are ingested with delay larger
-      //    than bufferTimeMS)
-      //
-      // We don't handle the following cases intentionally because it will be 
either overkill or too complex
-      // 1. New partition added. If new partitions are not picked up timely, 
the MergeRollupTask will move watermarks
-      //    forward, and may not be able to merge some lately-created segments 
for those new partitions -- users should
-      //    configure pinot properly to discover new partitions timely, or 
they should restart pinot servers manually
-      //    for new partitions to be picked up
-      // 2. (1) no new in-progress segments are created for some partitions 
(2) new in-progress segments are created for
-      //    partitions, but there is no record consumed (i.e, empty 
in-progress segments). In those two cases,
-      //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
-      //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
-      //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
-      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        if (!segmentZKMetadata.getStatus().isCompleted()
-            && segmentZKMetadata.getTotalDocs() > 0
-            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
-          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
-        }
+  static List<SegmentZKMetadata> 
filterSegmentsforRealtimeTable(List<SegmentZKMetadata> allSegments) {
+    // For realtime table, don't process
+    // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS), this has 
been taken care of in
+    //    getNonConsumingSegmentsZKMetadataForRealtimeTable()
+    // 2. most recent sealed segments in each partition, this prevents those 
paused segments from being merged.
+    //
+    // Note that we make the following two assumptions here:
+    // 1. streaming data consumer lags are negligible
+    // 2. streaming data records are ingested mostly in chronological order 
(no records are ingested with delay larger
+    //    than bufferTimeMS)
+    //
+    // We don't handle the following cases intentionally because it will be 
either overkill or too complex
+    // 1. New partition added. If new partitions are not picked up timely, the 
MergeRollupTask will move watermarks
+    //    forward, and may not be able to merge some lately-created segments 
for those new partitions -- users should
+    //    configure pinot properly to discover new partitions timely, or they 
should restart pinot servers manually
+    //    for new partitions to be picked up
+    // 2. (1) no new in-progress segments are created for some partitions (2) 
new in-progress segments are created for
+    //    partitions, but there is no record consumed (i.e, empty in-progress 
segments). In those two cases,
+    //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
+    //    not be able to merge those lately-created segments -- we assume that 
users will have a way to backfill those
+    //    records correctly.
+    Map<Integer, String> partitionIdToLatestCompletedSegment = new HashMap<>();
+    for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      if (LLCSegmentName.isLLCSegment(segmentName)) {
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        
partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(),
 (partId, latestSegment) -> {
+          if (latestSegment == null) {
+            return segmentName;
+          } else {
+            return new LLCSegmentName(latestSegment).getSequenceNumber() > 
llcSegmentName.getSequenceNumber()
+                    ? latestSegment : segmentName;
+          }
+        });
       }
-      final long finalEarliestStartTimeMsOfInProgressSegments = 
earliestStartTimeMsOfInProgressSegments;
-      return allSegments.stream()
-          .filter(segmentZKMetadata -> 
segmentZKMetadata.getStatus().isCompleted()
-              && segmentZKMetadata.getStartTimeMs() < 
finalEarliestStartTimeMsOfInProgressSegments)
-          .collect(Collectors.toList());
-    } else {
-      return allSegments;
     }
+    return allSegments.stream()
+            .filter(a -> 
!partitionIdToLatestCompletedSegment.containsValue(a.getSegmentName()))

Review Comment:
   I believe `.containsValue` is O(N). Could you please put the latests 
completed segments in a hashmap right before the return statement?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java:
##########
@@ -283,6 +282,46 @@ public void testGenerateTasksCheckConfigs() {
     assertTrue(pinotTaskConfigs.isEmpty());
   }
 
+  /**
+   * Test pre-filter of task generation
+   */
+  @Test
+  public void testFilterSegmentsforRealtimeTable() {
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+
+    
when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    // construct 3 following segments, among these, only 0_0 can be scheduled, 
others should be filtered out
+    // partition 0, completed 0
+    SegmentZKMetadata realtimeTableSegmentMetadata1 =
+            getSegmentZKMetadata("testTable__0__0__0", 5000, 6000, 
TimeUnit.MILLISECONDS,

Review Comment:
   nit: `s/testTable__0__0__0/testTable__0__0__20250224T0900Z/`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to