swaminathanmanish commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1900822037


##########
pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+
+/**
+ * ExpectedRealtimeOfflineTaskResult is created in
+ * {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
+ * before uploading offline segment(s) to the offline table.
+ *
+ *  The <code>_segmentsFrom</code> denotes the input RealtimeSegments.
+ *  The <code>_segmentsTo</code> denotes the expected offline segemnts.
+ *  The <code>_id</code> denotes the unique identifier of object.
+ *  The <code>_taskID</code> denotes the minion taskId.
+ *  The <code>_taskFailure</code> denotes the failure status of minion task 
handling the
+ *    current ExpectedResult. This is modified in
+ *    {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator}
+ *    when a prev minion task is failed.
+ *
+ */
+public class ExpectedSubtaskResult {

Review Comment:
   Is this applicable for other tasks. If not, I'd suggest moving this data 
structure to a RTO specific module. 



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+
+/**
+ * ExpectedRealtimeOfflineTaskResult is created in
+ * {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
+ * before uploading offline segment(s) to the offline table.
+ *
+ *  The <code>_segmentsFrom</code> denotes the input RealtimeSegments.
+ *  The <code>_segmentsTo</code> denotes the expected offline segemnts.
+ *  The <code>_id</code> denotes the unique identifier of object.
+ *  The <code>_taskID</code> denotes the minion taskId.
+ *  The <code>_taskFailure</code> denotes the failure status of minion task 
handling the
+ *    current ExpectedResult. This is modified in
+ *    {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator}
+ *    when a prev minion task is failed.
+ *
+ */
+public class ExpectedSubtaskResult {
+  private final List<String> _segmentsFrom;
+  private final List<String> _segmentsTo;
+  private final String _id;
+  private final String _taskID;
+  private boolean _taskFailure = false;
+
+  public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> 
segmentsTo, String taskID) {
+    _segmentsFrom = segmentsFrom;
+    _segmentsTo = segmentsTo;
+    _taskID = taskID;
+    _id = UUID.randomUUID().toString();
+  }
+
+  public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> 
segmentsTo,
+      String id, String taskID, boolean taskFailure) {
+    _segmentsFrom = segmentsFrom;
+    _segmentsTo = segmentsTo;
+    _id = id;
+    _taskID = taskID;
+    _taskFailure = taskFailure;
+  }
+
+  public String getTaskID() {

Review Comment:
   Can this itself be a uniqueId instead of new _id field?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -194,19 +205,54 @@ protected List<SegmentConversionResult> 
convert(PinotTaskConfig pinotTaskConfig,
 
   /**
    * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
-   * Checks that the version of the ZNode matches with the version cached 
earlier. If yes, proceeds to update
-   * watermark in the ZNode
-   * TODO: Making the minion task update the ZK metadata is an anti-pattern, 
however cannot see another way to do it
+   * Before uploading the segments, updates the metadata with the expected 
results
+   * of the successful execution of current subtask.
+   * The expected result updated in metadata is read by the next iteration of 
Task Generator
+   * to ensure data correctness.
    */
   @Override
-  public void postProcess(PinotTaskConfig pinotTaskConfig) {
-    Map<String, String> configs = pinotTaskConfig.getConfigs();
-    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
-    long waterMarkMs = 
Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
-    RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
-        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
waterMarkMs);
-    _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, 
RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        _expectedVersion);
+  protected void preUploadSegments(SegmentUploadContext context)
+      throws Exception {
+    super.preUploadSegments(context);
+    String realtimeTableName = context.getTableNameWithType();
+    int attemptCount;
+    try {
+      attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+        try {
+          ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+              
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
+                  RealtimeToOfflineSegmentsTask.TASK_TYPE);
+          int expectedVersion = 
realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+
+          // Adding ExpectedRealtimeToOfflineSegmentsTaskResult might fail.
+          // In-case of failure there will be runtime exception thrown
+          RealtimeToOfflineSegmentsTaskMetadata 
updatedRealtimeToOfflineSegmentsTaskMetadata =
+              getUpdatedTaskMetadata(context, 
realtimeToOfflineSegmentsTaskZNRecord);
+
+          // Setting to zookeeper might fail due to version mismatch, but in 
this case
+          // the exception is caught and retried.
+          
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(updatedRealtimeToOfflineSegmentsTaskMetadata,
+              RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              expectedVersion);
+          return true;
+        } catch (ZkBadVersionException e) {
+          LOGGER.info(
+              "Version changed while updating num of subtasks left in RTO task 
metadata for table: {}, Retrying.",
+              realtimeTableName);
+          return false;
+        }
+      });
+    } catch (Exception e) {
+      String errorMsg =
+          String.format("Failed to update the 
RealtimeToOfflineSegmentsTaskMetadata during preUploadSegments. "

Review Comment:
   Can you enhance this "failed to update to Zk after N attempts" or something 
so that we know from the log its a Zk update failure.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+          
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+          // windowEndTime of prev minion task needs to be re-used for picking 
up the
+          // next windowStartTime. This is useful for case where user changes 
minion config
+          // after a minion task run was complete. So windowStartTime cannot 
be watermark + bucketMs
+          windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
         }
+        long windowEndMs = windowStartMs + bucketMs;
+        // since window changed, pick new segments.
+        segmentsToBeScheduled =
+            generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, 
windowStartMs, windowEndMs, bucketMs,
+                bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, 
realtimeToOfflineSegmentsTaskMetadata);
+      }
 
-        for (SegmentZKMetadata segmentZKMetadata : 
completedSegmentsZKMetadata) {
-          String segmentName = segmentZKMetadata.getSegmentName();
-          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-          // Check overlap with window
-          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
-            // If last completed segment is being used, make sure that segment 
crosses over end of window.
-            // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
-            // would be skipped forever.
-            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
-              LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
-                  + "generation: {}", segmentName, taskType);
-              skipGenerate = true;
-              break;
-            }
-            segmentNames.add(segmentName);
-            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+      divideSegmentsAmongSubtasks(segmentsToBeScheduled, 
segmentNamesGroupList, segmentNameVsDownloadURL,
+          maxNumRecordsPerTask);
+
+      if (segmentNamesGroupList.isEmpty()) {
+        continue;
+      }
+
+      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+      long newWindowStartTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+      long newWindowEndTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+      LOGGER.info(
+          "generating tasks for: {} with window start time: {}, window end 
time: {}, table: {}", taskType,
+          windowStartMs,
+          newWindowEndTime, realtimeTableName);
+
+      for (List<String> segmentNameList : segmentNamesGroupList) {
+        List<String> downloadURLList = getDownloadURLList(segmentNameList, 
segmentNameVsDownloadURL);
+        Preconditions.checkState(segmentNameList.size() == 
downloadURLList.size());
+        pinotTaskConfigsForTable.add(
+            createPinotTaskConfig(segmentNameList, downloadURLList, 
realtimeTableName, taskConfigs, tableConfig,
+                newWindowStartTime,
+                newWindowEndTime, taskType));
+      }
+      try {
+        _clusterInfoAccessor
+            .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+                MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+                expectedVersion);
+      } catch (ZkBadVersionException e) {
+        LOGGER.error(
+            "Version changed while updating RTO task metadata for table: {}, 
skip scheduling. There are "
+                + "multiple task schedulers for the same table, need to 
investigate!", realtimeTableName);
+        // skip this table for this minion run
+        continue;
+      }
+
+      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
           }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
         }
-        if (skipGenerate || !segmentNames.isEmpty()) {
-          break;
-        }
+      }
+    }
+  }
 
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}), moving to the next time bucket",
-            taskType, windowStartMs, windowEndMs);
-        windowStartMs = windowEndMs;
-        windowEndMs += bucketMs;
+  private List<String> getDownloadURLList(List<String> segmentNameList, 
Map<String, String> segmentNameVsDownloadURL) {
+    List<String> downloadURLList = new ArrayList<>();
+    for (String segmentName : segmentNameList) {
+      downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+    }
+    return downloadURLList;
+  }
+
+  private void deleteInvalidOfflineSegments(String offlineTableName,
+      Set<String> realtimeSegmentsToBeReProcessed,
+      Set<String> existingOfflineTableSegmentNames,
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+    Set<String> segmentsToBeDeleted = new HashSet<>();
+
+    for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+      String id = 
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+      Preconditions.checkNotNull(id);
+      ExpectedSubtaskResult expectedSubtaskResult =
+          expectedSubtaskResultMap.get(id);
+      // if already marked as failure, no need to delete again.
+      if (expectedSubtaskResult.isTaskFailure()) {
+        continue;
       }
+      List<String> expectedCorrespondingOfflineSegments = 
expectedSubtaskResult.getSegmentsTo();
+      segmentsToBeDeleted.addAll(

Review Comment:
   Add log.info, to indicate the number of segments that needs to deleted, if 
not the segment names (if its a long list). 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();

Review Comment:
   This is idempotent and should be fine upon re-execution? (i.e if there's 
failure prior to updating this state in Zk)



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+          
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+          // windowEndTime of prev minion task needs to be re-used for picking 
up the
+          // next windowStartTime. This is useful for case where user changes 
minion config
+          // after a minion task run was complete. So windowStartTime cannot 
be watermark + bucketMs
+          windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
         }
+        long windowEndMs = windowStartMs + bucketMs;
+        // since window changed, pick new segments.
+        segmentsToBeScheduled =
+            generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, 
windowStartMs, windowEndMs, bucketMs,
+                bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, 
realtimeToOfflineSegmentsTaskMetadata);
+      }
 
-        for (SegmentZKMetadata segmentZKMetadata : 
completedSegmentsZKMetadata) {
-          String segmentName = segmentZKMetadata.getSegmentName();
-          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-          // Check overlap with window
-          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
-            // If last completed segment is being used, make sure that segment 
crosses over end of window.
-            // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
-            // would be skipped forever.
-            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
-              LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
-                  + "generation: {}", segmentName, taskType);
-              skipGenerate = true;
-              break;
-            }
-            segmentNames.add(segmentName);
-            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+      divideSegmentsAmongSubtasks(segmentsToBeScheduled, 
segmentNamesGroupList, segmentNameVsDownloadURL,
+          maxNumRecordsPerTask);
+
+      if (segmentNamesGroupList.isEmpty()) {
+        continue;
+      }
+
+      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+      long newWindowStartTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+      long newWindowEndTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+      LOGGER.info(
+          "generating tasks for: {} with window start time: {}, window end 
time: {}, table: {}", taskType,
+          windowStartMs,
+          newWindowEndTime, realtimeTableName);
+
+      for (List<String> segmentNameList : segmentNamesGroupList) {
+        List<String> downloadURLList = getDownloadURLList(segmentNameList, 
segmentNameVsDownloadURL);
+        Preconditions.checkState(segmentNameList.size() == 
downloadURLList.size());
+        pinotTaskConfigsForTable.add(
+            createPinotTaskConfig(segmentNameList, downloadURLList, 
realtimeTableName, taskConfigs, tableConfig,
+                newWindowStartTime,
+                newWindowEndTime, taskType));
+      }
+      try {
+        _clusterInfoAccessor
+            .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+                MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+                expectedVersion);
+      } catch (ZkBadVersionException e) {
+        LOGGER.error(
+            "Version changed while updating RTO task metadata for table: {}, 
skip scheduling. There are "
+                + "multiple task schedulers for the same table, need to 
investigate!", realtimeTableName);
+        // skip this table for this minion run
+        continue;
+      }
+
+      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
           }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
         }
-        if (skipGenerate || !segmentNames.isEmpty()) {
-          break;
-        }
+      }
+    }
+  }
 
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}), moving to the next time bucket",
-            taskType, windowStartMs, windowEndMs);
-        windowStartMs = windowEndMs;
-        windowEndMs += bucketMs;
+  private List<String> getDownloadURLList(List<String> segmentNameList, 
Map<String, String> segmentNameVsDownloadURL) {
+    List<String> downloadURLList = new ArrayList<>();
+    for (String segmentName : segmentNameList) {
+      downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+    }
+    return downloadURLList;
+  }
+
+  private void deleteInvalidOfflineSegments(String offlineTableName,
+      Set<String> realtimeSegmentsToBeReProcessed,
+      Set<String> existingOfflineTableSegmentNames,
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+    Set<String> segmentsToBeDeleted = new HashSet<>();
+
+    for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+      String id = 
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+      Preconditions.checkNotNull(id);
+      ExpectedSubtaskResult expectedSubtaskResult =
+          expectedSubtaskResultMap.get(id);
+      // if already marked as failure, no need to delete again.
+      if (expectedSubtaskResult.isTaskFailure()) {
+        continue;
       }
+      List<String> expectedCorrespondingOfflineSegments = 
expectedSubtaskResult.getSegmentsTo();
+      segmentsToBeDeleted.addAll(
+          getSegmentsToDelete(expectedCorrespondingOfflineSegments, 
existingOfflineTableSegmentNames));
+      // The expectedRealtimeToOfflineTaskResult is confirmed to be
+      // related to a failed task. Mark it as a failure, since executor will
+      // then only replace expectedRealtimeToOfflineTaskResult for the
+      // segments to be reprocessed.
+      expectedSubtaskResult.setTaskFailure();
+    }
+
+    if (!segmentsToBeDeleted.isEmpty()) {
+      _clusterInfoAccessor.getPinotHelixResourceManager()
+          .deleteSegments(offlineTableName, new 
ArrayList<>(segmentsToBeDeleted));
+    }
+  }
+
+  private Set<String> getFailedTaskSegments(
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata,
+      Set<String> existingOfflineTableSegmentNames) {
+    Set<String> failedIds = new HashSet<>();
+
+    // Get all the ExpectedRealtimeToOfflineTaskResult of prev minion task
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+    Collection<ExpectedSubtaskResult> expectedSubtaskResultList =
+        expectedSubtaskResultMap.values();
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Set<String> expectedSubtaskResultIds =
+        new HashSet<>(segmentNameToExpectedSubtaskResultID.values());
+
+    Set<String> segmentNamesToReprocess = new HashSet<>();
 
-      if (skipGenerate) {
+    // Check what all offline segments are present currently
+    for (ExpectedSubtaskResult expectedSubtaskResult
+        : expectedSubtaskResultList) {
+
+      if (expectedSubtaskResult.isTaskFailure()) {
+        // if task is failure and is referenced by any segment, only then add 
to failed task.
+        if (expectedSubtaskResultIds.contains(expectedSubtaskResult.getId())) {
+          failedIds.add(expectedSubtaskResult.getId());
+        }
         continue;
       }
 
-      Map<String, String> configs = 
MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
-          _clusterInfoAccessor);
-      configs.putAll(getBaseTaskConfigs(tableConfig, segmentNames));
-      configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
-      configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
-
-      // Segment processor configs
-      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
String.valueOf(windowStartMs));
-      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
String.valueOf(windowEndMs));
-      String roundBucketTimePeriod = 
taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
-      if (roundBucketTimePeriod != null) {
-        
configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, 
roundBucketTimePeriod);
+      // get offline segments
+      List<String> segmentTo = expectedSubtaskResult.getSegmentsTo();
+
+      // If not all corresponding offline segments to a realtime segment 
exists,
+      // it means there was an issue with prev minion task. And segment needs
+      // to be re-processed.
+      boolean taskSuccessful = checkIfAllSegmentsExists(segmentTo, 
existingOfflineTableSegmentNames);

Review Comment:
   This is despite the taskFailure check  "isTaskFailure" ? If thats the case, 
do we really need taskFailure status and instead just validate segmentsTo with 
the offline table segments, for existence? 



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/ExpectedSubtaskResult.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+
+/**
+ * ExpectedRealtimeOfflineTaskResult is created in
+ * {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor}
+ * before uploading offline segment(s) to the offline table.
+ *
+ *  The <code>_segmentsFrom</code> denotes the input RealtimeSegments.
+ *  The <code>_segmentsTo</code> denotes the expected offline segemnts.
+ *  The <code>_id</code> denotes the unique identifier of object.
+ *  The <code>_taskID</code> denotes the minion taskId.
+ *  The <code>_taskFailure</code> denotes the failure status of minion task 
handling the
+ *    current ExpectedResult. This is modified in
+ *    {@link 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator}
+ *    when a prev minion task is failed.
+ *
+ */
+public class ExpectedSubtaskResult {

Review Comment:
   Can we name this something like RealtimeToOfflineCheckpoint or something ? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -156,6 +162,11 @@ protected List<SegmentConversionResult> 
convert(PinotTaskConfig pinotTaskConfig,
     // Segment config
     
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
 
+    // Since multiple subtasks run in parallel, there shouldn't be a name 
conflict.
+    // Append uuid
+    segmentProcessorConfigBuilder.setSegmentNameGenerator(

Review Comment:
   I feel this should be left to the user to setup the naming scheme and we 
pick a default/generic naming scheme, instead of using a specific one. Can we 
not use the default name generator in SegmentProcessorFramework. Isn't time 
based normalized name generator used? 
   



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -99,12 +107,10 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
     RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
         
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
     long windowStartMs = 
Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
-    
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() 
<= windowStartMs,
+    
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs()
 == windowStartMs,

Review Comment:
   Nice stricter check ! Please confirm this. 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -107,6 +115,8 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       LOGGER.info("Start generating task configs for table: {} for task: {}", 
realtimeTableName, taskType);
 
       // Only schedule 1 task of this type, per table
+      // Still there can be scenario where generator can generate additional 
task, while previous task
+      // is just about to be enqueued in the helix queue.

Review Comment:
   Can you elaborate on this comment? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.

Review Comment:
   Please add sufficient log.info (without flooding:)), that can help someone 
debug whats going on (specifically differentiating between happy and failure 
paths and when there's failure, what action took place/which segments had to be 
reprocessed or deleted...
   



##########
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java:
##########
@@ -18,57 +18,183 @@
  */
 package org.apache.pinot.common.minion;
 
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 
 /**
- * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>.
- * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
- *
+ * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>. The <code>_windowStartMs</code>
+ * denotes the time (exclusive) until which it's certain that tasks have been 
completed successfully. The
+ * <code>_expectedSubtaskResultMap</code> contains the expected RTO tasks 
result info. This map can contain both
+ * completed and in-completed Tasks expected Results. This map is used by 
generator to validate whether a potential
+ * segment (for RTO task) has already been successfully processed as a RTO 
task in the past or not. The
+ * <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the window 
bucket time of currently not
+ * successfully completed minion task. bucket: [_windowStartMs, _windowEndMs) 
The window is updated by generator when
+ * it's certain that prev minon task run is successful.
+ * <p>
  * This gets serialized and stored in zookeeper under the path
  * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
- *
- * PinotTaskGenerator:
- * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
- * to determine the window of execution for the task it is generating.
- * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
- *
- * PinotTaskExecutor:
- * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
- * - Verify that is is running the latest task scheduled by the task generator
- * - Update the watermark as the end of the window that it executed for
+ * <p>
+ * PinotTaskGenerator: The <code>_windowStartMs</code>> is used by the
+ * <code>RealtimeToOfflineSegmentsTaskGenerator</code>, to determine the 
window of execution of the prev task based on
+ * which it generates new task.
+ * <p>
+ * PinotTaskExecutor: The same windowStartMs is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that it's running the latest task scheduled by the task generator.
+ * - The _expectedSubtaskResultMap is updated before the offline segments are 
uploaded to the table.
  */
 public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {
 
-  private static final String WATERMARK_KEY = "watermarkMs";
+  private static final String WINDOW_START_KEY = "watermarkMs";
+  private static final String WINDOW_END_KEY = "windowEndMs";
+  private static final String COMMA_SEPARATOR = ",";
+  private static final String SEGMENT_NAME_TO_EXPECTED_SUBTASK_RESULT_ID_KEY = 
"segmentToExpectedSubtaskResultId";
 
   private final String _tableNameWithType;
-  private final long _watermarkMs;
+  private long _windowStartMs;
+  private long _windowEndMs;
+  private final Map<String, ExpectedSubtaskResult> _expectedSubtaskResultMap;
+  private final Map<String, String> _segmentNameToExpectedSubtaskResultID;

Review Comment:
   Why do we need this Map? Can we not use the previous map itself to get the 
list of segmentsFrom and segmentsTo for a specific taskId? We can the construct 
this map in memory, from that right? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+          
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+          // windowEndTime of prev minion task needs to be re-used for picking 
up the
+          // next windowStartTime. This is useful for case where user changes 
minion config
+          // after a minion task run was complete. So windowStartTime cannot 
be watermark + bucketMs
+          windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
         }
+        long windowEndMs = windowStartMs + bucketMs;
+        // since window changed, pick new segments.
+        segmentsToBeScheduled =
+            generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, 
windowStartMs, windowEndMs, bucketMs,
+                bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, 
realtimeToOfflineSegmentsTaskMetadata);
+      }
 
-        for (SegmentZKMetadata segmentZKMetadata : 
completedSegmentsZKMetadata) {
-          String segmentName = segmentZKMetadata.getSegmentName();
-          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-          // Check overlap with window
-          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
-            // If last completed segment is being used, make sure that segment 
crosses over end of window.
-            // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
-            // would be skipped forever.
-            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
-              LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
-                  + "generation: {}", segmentName, taskType);
-              skipGenerate = true;
-              break;
-            }
-            segmentNames.add(segmentName);
-            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+      divideSegmentsAmongSubtasks(segmentsToBeScheduled, 
segmentNamesGroupList, segmentNameVsDownloadURL,
+          maxNumRecordsPerTask);
+
+      if (segmentNamesGroupList.isEmpty()) {
+        continue;
+      }
+
+      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+      long newWindowStartTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+      long newWindowEndTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+      LOGGER.info(
+          "generating tasks for: {} with window start time: {}, window end 
time: {}, table: {}", taskType,
+          windowStartMs,
+          newWindowEndTime, realtimeTableName);
+
+      for (List<String> segmentNameList : segmentNamesGroupList) {
+        List<String> downloadURLList = getDownloadURLList(segmentNameList, 
segmentNameVsDownloadURL);
+        Preconditions.checkState(segmentNameList.size() == 
downloadURLList.size());
+        pinotTaskConfigsForTable.add(
+            createPinotTaskConfig(segmentNameList, downloadURLList, 
realtimeTableName, taskConfigs, tableConfig,
+                newWindowStartTime,
+                newWindowEndTime, taskType));
+      }
+      try {
+        _clusterInfoAccessor
+            .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+                MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+                expectedVersion);
+      } catch (ZkBadVersionException e) {
+        LOGGER.error(
+            "Version changed while updating RTO task metadata for table: {}, 
skip scheduling. There are "
+                + "multiple task schedulers for the same table, need to 
investigate!", realtimeTableName);
+        // skip this table for this minion run
+        continue;
+      }
+
+      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
           }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
         }
-        if (skipGenerate || !segmentNames.isEmpty()) {
-          break;
-        }
+      }
+    }
+  }
 
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}), moving to the next time bucket",
-            taskType, windowStartMs, windowEndMs);
-        windowStartMs = windowEndMs;
-        windowEndMs += bucketMs;
+  private List<String> getDownloadURLList(List<String> segmentNameList, 
Map<String, String> segmentNameVsDownloadURL) {
+    List<String> downloadURLList = new ArrayList<>();
+    for (String segmentName : segmentNameList) {
+      downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+    }
+    return downloadURLList;
+  }
+
+  private void deleteInvalidOfflineSegments(String offlineTableName,
+      Set<String> realtimeSegmentsToBeReProcessed,
+      Set<String> existingOfflineTableSegmentNames,
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+    Set<String> segmentsToBeDeleted = new HashSet<>();
+
+    for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+      String id = 
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+      Preconditions.checkNotNull(id);
+      ExpectedSubtaskResult expectedSubtaskResult =
+          expectedSubtaskResultMap.get(id);
+      // if already marked as failure, no need to delete again.
+      if (expectedSubtaskResult.isTaskFailure()) {

Review Comment:
   Do we need this state ? Can we not use offline table as source of truth of 
whether a segment exists or not. 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
       long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
       long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
 
-      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd =
-      // windowStart + bucket.
-      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsZKMetadata, bucketMs);
-      long windowEndMs = windowStartMs + bucketMs;
+      ZNRecord realtimeToOfflineZNRecord =
+          
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+              realtimeTableName);
+      int expectedVersion = realtimeToOfflineZNRecord != null ? 
realtimeToOfflineZNRecord.getVersion() : -1;
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          getRTOTaskMetadata(realtimeTableName, 
completedRealtimeSegmentsZKMetadata, bucketMs,
+              realtimeToOfflineZNRecord);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark.
+      long windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
 
       // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd
       // (exclusive)
-      List<String> segmentNames = new ArrayList<>();
-      List<String> downloadURLs = new ArrayList<>();
       Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
-      boolean skipGenerate = false;
-      while (true) {
-        // Check that execution window is older than bufferTime
-        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-          LOGGER.info(
-              "Window with start: {} and end: {} is not older than buffer 
time: {} configured as {} ago. Skipping task "
-                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, 
bufferTimePeriod, taskType);
-          skipGenerate = true;
-          break;
+
+      // Get all offline table segments.
+      // These are used to validate if previous minion task was successful or 
not
+      String offlineTableName =
+          
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+      Set<String> existingOfflineTableSegmentNames =
+          new 
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
 true));
+
+      // In-case of previous minion task failures, get info
+      // of failed minion subtasks. They need to be reprocessed.
+      Set<String> failedTaskInputSegments =
+          getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata, 
existingOfflineTableSegmentNames);
+
+      // In-case of partial failure of segments upload in prev minion task run,
+      // data is inconsistent, delete the corresponding offline segments 
immediately.
+      if (!failedTaskInputSegments.isEmpty()) {
+        deleteInvalidOfflineSegments(offlineTableName, 
failedTaskInputSegments, existingOfflineTableSegmentNames,
+            realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      List<SegmentZKMetadata> segmentsToBeReProcessed =
+          filterOutRemovedSegments(failedTaskInputSegments, 
completedRealtimeSegmentsZKMetadata);
+
+      // if no segment to be reprocessed, no failure
+      boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+      List<List<String>> segmentNamesGroupList = new ArrayList<>();
+      Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+      // maxNumRecordsPerTask is used to divide a minion tasks among
+      // multiple subtasks to improve performance.
+      int maxNumRecordsPerTask =
+          
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
 != null
+              ? Integer.parseInt(
+              
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+              : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+      List<SegmentZKMetadata> segmentsToBeScheduled;
+
+      if (!prevMinionTaskSuccessful) {
+        segmentsToBeScheduled = segmentsToBeReProcessed;
+      } else {
+        // if all offline segments of prev minion tasks were successfully 
uploaded,
+        // we can clear the state of prev minion tasks as now it's useless.
+        if 
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+            isEmpty()) {
+          
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+          
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+          // windowEndTime of prev minion task needs to be re-used for picking 
up the
+          // next windowStartTime. This is useful for case where user changes 
minion config
+          // after a minion task run was complete. So windowStartTime cannot 
be watermark + bucketMs
+          windowStartMs = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
         }
+        long windowEndMs = windowStartMs + bucketMs;
+        // since window changed, pick new segments.
+        segmentsToBeScheduled =
+            generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, 
windowStartMs, windowEndMs, bucketMs,
+                bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, 
realtimeToOfflineSegmentsTaskMetadata);
+      }
 
-        for (SegmentZKMetadata segmentZKMetadata : 
completedSegmentsZKMetadata) {
-          String segmentName = segmentZKMetadata.getSegmentName();
-          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-          // Check overlap with window
-          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
-            // If last completed segment is being used, make sure that segment 
crosses over end of window.
-            // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
-            // would be skipped forever.
-            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
-              LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
-                  + "generation: {}", segmentName, taskType);
-              skipGenerate = true;
-              break;
-            }
-            segmentNames.add(segmentName);
-            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+      divideSegmentsAmongSubtasks(segmentsToBeScheduled, 
segmentNamesGroupList, segmentNameVsDownloadURL,
+          maxNumRecordsPerTask);
+
+      if (segmentNamesGroupList.isEmpty()) {
+        continue;
+      }
+
+      List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+      long newWindowStartTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+      long newWindowEndTime = 
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+      LOGGER.info(
+          "generating tasks for: {} with window start time: {}, window end 
time: {}, table: {}", taskType,
+          windowStartMs,
+          newWindowEndTime, realtimeTableName);
+
+      for (List<String> segmentNameList : segmentNamesGroupList) {
+        List<String> downloadURLList = getDownloadURLList(segmentNameList, 
segmentNameVsDownloadURL);
+        Preconditions.checkState(segmentNameList.size() == 
downloadURLList.size());
+        pinotTaskConfigsForTable.add(
+            createPinotTaskConfig(segmentNameList, downloadURLList, 
realtimeTableName, taskConfigs, tableConfig,
+                newWindowStartTime,
+                newWindowEndTime, taskType));
+      }
+      try {
+        _clusterInfoAccessor
+            .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+                MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+                expectedVersion);
+      } catch (ZkBadVersionException e) {
+        LOGGER.error(
+            "Version changed while updating RTO task metadata for table: {}, 
skip scheduling. There are "
+                + "multiple task schedulers for the same table, need to 
investigate!", realtimeTableName);
+        // skip this table for this minion run
+        continue;
+      }
+
+      pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
           }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
         }
-        if (skipGenerate || !segmentNames.isEmpty()) {
-          break;
-        }
+      }
+    }
+  }
 
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}), moving to the next time bucket",
-            taskType, windowStartMs, windowEndMs);
-        windowStartMs = windowEndMs;
-        windowEndMs += bucketMs;
+  private List<String> getDownloadURLList(List<String> segmentNameList, 
Map<String, String> segmentNameVsDownloadURL) {
+    List<String> downloadURLList = new ArrayList<>();
+    for (String segmentName : segmentNameList) {
+      downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+    }
+    return downloadURLList;
+  }
+
+  private void deleteInvalidOfflineSegments(String offlineTableName,
+      Set<String> realtimeSegmentsToBeReProcessed,
+      Set<String> existingOfflineTableSegmentNames,
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+
+    Map<String, String> segmentNameToExpectedSubtaskResultID =
+        
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+    Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+        realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+    Set<String> segmentsToBeDeleted = new HashSet<>();
+
+    for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+      String id = 
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+      Preconditions.checkNotNull(id);
+      ExpectedSubtaskResult expectedSubtaskResult =
+          expectedSubtaskResultMap.get(id);
+      // if already marked as failure, no need to delete again.
+      if (expectedSubtaskResult.isTaskFailure()) {
+        continue;
       }
+      List<String> expectedCorrespondingOfflineSegments = 
expectedSubtaskResult.getSegmentsTo();
+      segmentsToBeDeleted.addAll(
+          getSegmentsToDelete(expectedCorrespondingOfflineSegments, 
existingOfflineTableSegmentNames));
+      // The expectedRealtimeToOfflineTaskResult is confirmed to be
+      // related to a failed task. Mark it as a failure, since executor will
+      // then only replace expectedRealtimeToOfflineTaskResult for the
+      // segments to be reprocessed.
+      expectedSubtaskResult.setTaskFailure();
+    }
+
+    if (!segmentsToBeDeleted.isEmpty()) {

Review Comment:
   It can fail here as well, I assume, but we would not re-issue deletes for 
already deleted segments.  Can you confirm that the actual deep store deletion 
happens async and only metadata (Zk, ideal state) gets removed. This is to make 
sure that this step does not hog the controller. 
   



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -287,77 +585,69 @@ private void getCompletedSegmentsInfo(String 
realtimeTableName, List<SegmentZKMe
    * If the znode is null, computes the watermark using either the start time 
config or the start time from segment
    * metadata
    */
-  private long getWatermarkMs(String realtimeTableName, 
List<SegmentZKMetadata> completedSegmentsZKMetadata,
-      long bucketMs) {
-    ZNRecord realtimeToOfflineZNRecord =
-        
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
-            realtimeTableName);
-    RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
-        realtimeToOfflineZNRecord != null ? 
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
-            realtimeToOfflineZNRecord) : null;
-
-    if (realtimeToOfflineSegmentsTaskMetadata == null) {
-      // No ZNode exists. Cold-start.
-      long watermarkMs;
-
-      // Find the smallest time from all segments
-      long minStartTimeMs = Long.MAX_VALUE;
-      for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
-        minStartTimeMs = Math.min(minStartTimeMs, 
segmentZKMetadata.getStartTimeMs());
-      }
-      Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
+  private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String 
realtimeTableName,
+      List<SegmentZKMetadata> completedSegmentsZKMetadata,
+      long bucketMs, ZNRecord realtimeToOfflineZNRecord) {
+
+    if (realtimeToOfflineZNRecord != null) {
+      return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+          realtimeToOfflineZNRecord);
+    }
 
-      // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
-      // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window
-      // [20200813, 20200814)
-      watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
+    // No ZNode exists. Cold-start.
+    long watermarkMs;
 
-      // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark 
calculated above
-      realtimeToOfflineSegmentsTaskMetadata = new 
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
-      
_clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
-          MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1);
+    // Find the smallest time from all segments
+    long minStartTimeMs = Long.MAX_VALUE;
+    for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+      minStartTimeMs = Math.min(minStartTimeMs, 
segmentZKMetadata.getStartTimeMs());
     }
-    return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
-  }
+    Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
 
-  @Override
-  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
-    // check table is not upsert
-    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
-        "RealtimeToOfflineTask doesn't support upsert table!");
-    // check no malformed period
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
-    TimeUtils.convertPeriodToMillis(
-        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
-    // check mergeType is correct
-    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
-        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
-            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+    // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
+    // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window
+    // [20200813, 20200814)
+    watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
 
-    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
-    // check no mis-configured columns
-    Set<String> columnNames = schema.getColumnNames();
+    return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
watermarkMs);
+  }
+
+  private PinotTaskConfig createPinotTaskConfig(List<String> segmentNameList, 
List<String> downloadURLList,
+      String realtimeTableName, Map<String, String> taskConfigs, TableConfig 
tableConfig, long windowStartMs,
+      long windowEndMs, String taskType) {
+
+    Map<String, String> configs = 
MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
+        _clusterInfoAccessor);
+    configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList));
+    configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR));

Review Comment:
   Should the order of download url list match segmentNameList, so that minion 
correctly uses the download url ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to