snleee commented on code in PR #9890:
URL: https://github.com/apache/pinot/pull/9890#discussion_r1041352399


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -430,22 +448,70 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @VisibleForTesting
+  static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType 
tableType, List<SegmentZKMetadata> allSegments) {
+    if (tableType == TableType.REALTIME) {
+      // For realtime table, don't process
+      // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
+      // 2. sealed segments with start time later than the earliest start time 
of all in progress segments
+      // This prevents those in-progress segments from not being merged.
+      //
+      // Note that we make the following two assumptions here:
+      // 1. streaming data consumer lags are negligible
+      // 2. streaming data records are ingested mostly in chronological order 
(no records are ingested with delay larger
+      //    than bufferTimeMS)
+      //
+      // We don't handle the following cases intentionally because it will be 
either overkill or too complex
+      // 1. New partition added. If new partitions are not picked up timely, 
the MergeRollupTask will move watermarks
+      //    forward, and may not be able to merge some lately-created segments 
for those new partitions -- users should
+      //    configure pinot properly to discover new partitions timely, or 
they should restart pinot servers manually
+      //    for new partitions to be picked up
+      // 2. (1) no new in-progress segments are created for some partitions 
(2) new in-progress segments are created for
+      //    partitions, but there is no record consumed (i.e, empty 
in-progress segments). In those two cases,
+      //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
+      //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
+      //    records correctly.
+      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+        if (!segmentZKMetadata.getStatus().isCompleted()
+            && segmentZKMetadata.getTotalDocs() > 0
+            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
+          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
+        }
+      }
+      final long finalEarliestStartTimeMsOfInProgressSegments = 
earliestStartTimeMsOfInProgressSegments;
+      return allSegments.stream()
+              .filter(segmentZKMetadata -> 
segmentZKMetadata.getStatus().isCompleted()
+                  && segmentZKMetadata.getStartTimeMs() < 
finalEarliestStartTimeMsOfInProgressSegments)
+              .collect(Collectors.toList());
+    } else {
+      return allSegments;
+    }
+  }
+
   /**
    * Validate table config for merge/rollup task
    */
-  private boolean validate(TableConfig tableConfig, String taskType) {
-    String offlineTableName = tableConfig.getTableName();
-    if (tableConfig.getTableType() != TableType.OFFLINE) {
-      LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}, 
REALTIME table is not supported yet", taskType,
-          offlineTableName);
-      return false;
-    }
-
+  @VisibleForTesting
+  static boolean validate(TableConfig tableConfig, String taskType) {
+    String tableName = tableConfig.getTableName();

Review Comment:
   `tableNameWithType`? lol :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to