This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a39ad22d7a Change Pre-filter logic in MergeRollup task (#15177)
a39ad22d7a is described below

commit a39ad22d7ae8489f486c359bdd2656a4a8585adb
Author: Quan Yuan (Kate) <110874306+kate...@users.noreply.github.com>
AuthorDate: Fri Mar 7 13:06:24 2025 -0800

    Change Pre-filter logic in MergeRollup task (#15177)
---
 .../MergeRollupMinionClusterIntegrationTest.java   |  4 +-
 .../mergerollup/MergeRollupTaskGenerator.java      | 91 ++++++++++++----------
 .../mergerollup/MergeRollupTaskGeneratorTest.java  | 62 +++++++++++++--
 3 files changed, 108 insertions(+), 49 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 71a55da67d..e7b51c6064 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -1031,8 +1031,8 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
 
     String sqlQuery = "SELECT count(*) FROM " + tableName;
     JsonNode expectedJson = postQuery(sqlQuery);
-    long[] expectedNumBucketsToProcess100Days = {3, 2, 1, 0, 3, 2, 1, 0};
-    long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
+    long[] expectedNumBucketsToProcess100Days = {2, 1, 0, 0, 3, 2, 1, 0};
+    long[] expectedNumBucketsToProcess200Days = {0, 0, 2, 1, 0, 0, 1, 1};
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     int numTasks = 0;
     TaskSchedulingContext context = new TaskSchedulingContext()
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 10dcfcbf66..c4f9bd2175 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -42,6 +42,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -161,21 +162,22 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
       LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableNameWithType, taskType);
 
       // Get all segment metadata
-      List<SegmentZKMetadata> allSegments = 
getSegmentsZKMetadataForTable(tableNameWithType);
-      // Filter segments based on status
-      List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus
-          = filterSegmentsBasedOnStatus(tableConfig.getTableType(), 
allSegments);
+      List<SegmentZKMetadata> allSegments =
+              tableConfig.getTableType() == TableType.OFFLINE
+                      ? getSegmentsZKMetadataForTable(tableNameWithType)
+                      : filterSegmentsforRealtimeTable(
+                              
getNonConsumingSegmentsZKMetadataForRealtimeTable(tableNameWithType));
 
       // Select current segment snapshot based on lineage, filter out empty 
segments
       SegmentLineage segmentLineage = 
_clusterInfoAccessor.getSegmentLineage(tableNameWithType);
       Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
-      for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
+      for (SegmentZKMetadata segment : allSegments) {
         preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
       }
       
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
 segmentLineage);
 
       List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
-      for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
+      for (SegmentZKMetadata segment : allSegments) {
         if 
(preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && 
segment.getTotalDocs() > 0
             && MergeTaskUtils.allowMerge(segment)) {
           preSelectedSegments.add(segment);
@@ -543,44 +545,49 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
   }
 
   @VisibleForTesting
-  static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType 
tableType, List<SegmentZKMetadata> allSegments) {
-    if (tableType == TableType.REALTIME) {
-      // For realtime table, don't process
-      // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
-      // 2. sealed segments with start time later than the earliest start time 
of all in progress segments
-      // This prevents those in-progress segments from not being merged.
-      //
-      // Note that we make the following two assumptions here:
-      // 1. streaming data consumer lags are negligible
-      // 2. streaming data records are ingested mostly in chronological order 
(no records are ingested with delay larger
-      //    than bufferTimeMS)
-      //
-      // We don't handle the following cases intentionally because it will be 
either overkill or too complex
-      // 1. New partition added. If new partitions are not picked up timely, 
the MergeRollupTask will move watermarks
-      //    forward, and may not be able to merge some lately-created segments 
for those new partitions -- users should
-      //    configure pinot properly to discover new partitions timely, or 
they should restart pinot servers manually
-      //    for new partitions to be picked up
-      // 2. (1) no new in-progress segments are created for some partitions 
(2) new in-progress segments are created for
-      //    partitions, but there is no record consumed (i.e, empty 
in-progress segments). In those two cases,
-      //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
-      //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
-      //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
-      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        if (!segmentZKMetadata.getStatus().isCompleted()
-            && segmentZKMetadata.getTotalDocs() > 0
-            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
-          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
-        }
+  static List<SegmentZKMetadata> 
filterSegmentsforRealtimeTable(List<SegmentZKMetadata> allSegments) {
+    // For realtime table, don't process
+    // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS), this has 
been taken care of in
+    //    getNonConsumingSegmentsZKMetadataForRealtimeTable()
+    // 2. most recent sealed segments in each partition, this prevents those 
paused segments from being merged.
+    //
+    // Note that we make the following two assumptions here:
+    // 1. streaming data consumer lags are negligible
+    // 2. streaming data records are ingested mostly in chronological order 
(no records are ingested with delay larger
+    //    than bufferTimeMS)
+    //
+    // We don't handle the following cases intentionally because it will be 
either overkill or too complex
+    // 1. New partition added. If new partitions are not picked up timely, the 
MergeRollupTask will move watermarks
+    //    forward, and may not be able to merge some lately-created segments 
for those new partitions -- users should
+    //    configure pinot properly to discover new partitions timely, or they 
should restart pinot servers manually
+    //    for new partitions to be picked up
+    // 2. (1) no new in-progress segments are created for some partitions (2) 
new in-progress segments are created for
+    //    partitions, but there is no record consumed (i.e, empty in-progress 
segments). In those two cases,
+    //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
+    //    not be able to merge those lately-created segments -- we assume that 
users will have a way to backfill those
+    //    records correctly.
+    Map<Integer, LLCSegmentName> partitionIdToLatestCompletedSegment = new 
HashMap<>();
+    for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      if (LLCSegmentName.isLLCSegment(segmentName)) {
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        
partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(),
 (partId, latestSegment) -> {
+          if (latestSegment == null) {
+            return llcSegmentName;
+          } else {
+            return latestSegment.getSequenceNumber() > 
llcSegmentName.getSequenceNumber()
+                    ? latestSegment : llcSegmentName;
+          }
+        });
       }
-      final long finalEarliestStartTimeMsOfInProgressSegments = 
earliestStartTimeMsOfInProgressSegments;
-      return allSegments.stream()
-          .filter(segmentZKMetadata -> 
segmentZKMetadata.getStatus().isCompleted()
-              && segmentZKMetadata.getStartTimeMs() < 
finalEarliestStartTimeMsOfInProgressSegments)
-          .collect(Collectors.toList());
-    } else {
-      return allSegments;
     }
+    Set<String> filteredSegmentNames = new HashSet<>();
+    for (LLCSegmentName llcSegmentName : 
partitionIdToLatestCompletedSegment.values()) {
+      filteredSegmentNames.add(llcSegmentName.getSegmentName());
+    }
+    return allSegments.stream()
+            .filter(a -> !filteredSegmentNames.contains(a.getSegmentName()))
+            .collect(Collectors.toList());
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 18d60d0f9b..7665e7308f 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -246,7 +246,7 @@ public class MergeRollupTaskGeneratorTest {
     // the two following segments will be skipped when generating tasks
     SegmentZKMetadata realtimeTableSegmentMetadata1 =
         getSegmentZKMetadata("testTable__0__0__0", 5000, 50_000, 
TimeUnit.MILLISECONDS, null);
-    
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
     SegmentZKMetadata realtimeTableSegmentMetadata2 =
         getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, 
TimeUnit.MILLISECONDS, null);
     
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
@@ -266,15 +266,14 @@ public class MergeRollupTaskGeneratorTest {
 
     // Skip task generation, if the table is a realtime table and all segments 
are skipped
     // We don't test realtime REFRESH table because this combination does not 
make sense
-    
assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME,
-        Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2)).isEmpty());
+    assertTrue(MergeRollupTaskGenerator.filterSegmentsforRealtimeTable(
+            Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2)
+    ).isEmpty());
     TableConfig realtimeTableConfig = getTableConfig(TableType.REALTIME, new 
HashMap<>());
     List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
 
     // Skip task generation, if the table is an offline REFRESH table
-    
assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE,
-        Lists.newArrayList(offlineTableSegmentMetadata)).isEmpty());
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", null));
     TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, new 
HashMap<>());
@@ -283,6 +282,46 @@ public class MergeRollupTaskGeneratorTest {
     assertTrue(pinotTaskConfigs.isEmpty());
   }
 
+  /**
+   * Test pre-filter of task generation
+   */
+  @Test
+  public void testFilterSegmentsforRealtimeTable() {
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+
+    
when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    // construct 3 following segments, among these, only 0_0 can be scheduled, 
others should be filtered out
+    // partition 0, completed 0
+    SegmentZKMetadata realtimeTableSegmentMetadata1 =
+            getSegmentZKMetadata("testTable__0__0__20250224T0900Z", 5000, 
6000, TimeUnit.MILLISECONDS,
+                    null, "50000", "60000");
+    
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    // partition 0, completed 1
+    SegmentZKMetadata realtimeTableSegmentMetadata2 =
+            getSegmentZKMetadata("testTable__0__1__20250224T0902Z", 6000, 
7000, TimeUnit.MILLISECONDS,
+                    null, "60000", "70000");
+    
realtimeTableSegmentMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    // partition 1, completed 0
+    SegmentZKMetadata realtimeTableSegmentMetadata3 =
+            getSegmentZKMetadata("testTable__1__0__20250224T0900Z", 5500, 
6500, TimeUnit.MILLISECONDS,
+                    null, "55000", "65000");
+    
realtimeTableSegmentMetadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+            Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2,
+                    realtimeTableSegmentMetadata3));
+    when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+            getIdealState(REALTIME_TABLE_NAME, 
Lists.newArrayList("testTable__0", "server0", "ONLINE")));
+
+    MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+
+    List<SegmentZKMetadata> filterResult = 
MergeRollupTaskGenerator.filterSegmentsforRealtimeTable(
+            Lists.newArrayList(realtimeTableSegmentMetadata1, 
realtimeTableSegmentMetadata2,
+                    realtimeTableSegmentMetadata3));
+    assertEquals(filterResult.size(), 1);
+    assertEquals(filterResult.get(0).getSegmentName(), 
"testTable__0__0__20250224T0900Z");
+  }
+
   private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig, 
String segments, String mergeLevel,
       String mergeType, String partitionBucketTimePeriod, String 
roundBucketTimePeriod,
       String maxNumRecordsPerSegments) {
@@ -1034,6 +1073,19 @@ public class MergeRollupTaskGeneratorTest {
     return segmentZKMetadata;
   }
 
+  private SegmentZKMetadata getSegmentZKMetadata(String segmentName, long 
startTime, long endTime, TimeUnit timeUnit,
+                                                 String downloadURL, String 
startOffset, String endOffset) {
+    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
+    segmentZKMetadata.setStartTime(startTime);
+    segmentZKMetadata.setEndTime(endTime);
+    segmentZKMetadata.setTimeUnit(timeUnit);
+    segmentZKMetadata.setDownloadUrl(downloadURL);
+    segmentZKMetadata.setTotalDocs(1000);
+    segmentZKMetadata.setStartOffset(startOffset);
+    segmentZKMetadata.setEndOffset(endOffset);
+    return segmentZKMetadata;
+  }
+
   private IdealState getIdealState(String tableName, List<String> 
segmentNames) {
     IdealState idealState = new IdealState(tableName);
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to