Jackie-Jiang commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r505756502



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type 
<code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which 
tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + 
bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)

Review comment:
       This class does not need to be json compatible, as we always read/write 
it through `ZNRecord`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
##########
@@ -93,6 +97,38 @@ public Schema getTableSchema(String tableName) {
         
.getRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(),
 tableName);
   }
 
+  /**
+   * Get all segment metadata for the given lowlevel REALTIME table name.
+   *
+   * @param tableName Table name with or without REALTIME type suffix
+   * @return List of segment metadata
+   */
+  public List<LLCRealtimeSegmentZKMetadata> 
getLLCRealtimeSegmentsMetadata(String tableName) {
+    return ZKMetadataProvider
+        
.getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(),
 tableName);
+  }
+
+  /**
+   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from 
MINION_TASK_METADATA for given realtime table
+   * @param tableNameWithType realtime table name
+   */
+  public RealtimeToOfflineSegmentsTaskMetadata 
getMinionRealtimeToOfflineSegmentsTaskMetadata(
+      String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        
.getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into 
MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(

Review comment:
       Let's not put writing method in this class. This class should only 
contain methods of reading metadata

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }

Review comment:
       Also skip HLC table here?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -101,14 +101,14 @@ public void map()
     while (segmentRecordReader.hasNext()) {
       reusableRow = segmentRecordReader.next(reusableRow);
 
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
       // Record filtering
       if (_recordFilter.filter(reusableRow)) {
         continue;
       }
 
+      // Record transformation

Review comment:
       Do we do filtering before the transformation? Can filter be applied to 
the transformed columns?

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
##########
@@ -275,7 +275,7 @@ public void segmentMapperTest(String mapperId, 
SegmentMapperConfig segmentMapper
     SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
         new 
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue != 1597795200000}, 
timeValue)").build(), Lists.newArrayList(
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue 
>= 1597881600000}, timeValue)").build(), Lists.newArrayList(

Review comment:
       Is this related to this PR?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -62,16 +62,35 @@ private MinionConstants() {
     public static final String MERGED_SEGMENT_NAME_KEY = 
"mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the 
corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
     public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window 
younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    /**
+     * Config to manually provide start time for the very first task scheduled.
+     * In the absence of this config, the very first window start is 
calculated as min(start time of all completed segments)
+     */
+    public static final String START_TIME_MILLIS_KEY = "startTimeMillis";

Review comment:
       Do we need this config? This config can cause inconsistent query result 
when pushing the first offline segments, where the old real-time records that 
are not merged will be skipped.
   Is there any use case we want to configure this value?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, 
_clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. 
Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new 
ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new 
HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, 
partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, 
skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not 
ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = 
tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be 
null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
+      String startTimeStr = 
taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMillis =
+          getWatermarkMillis(realtimeTableName, completedSegmentsMetadata, 
startTimeStr, bucketMillis);
+      long windowEndMillis = windowStartMillis + bucketMillis;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMillis > System.currentTimeMillis() - bufferMillis) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: 
{} configured as {} ago. Skipping task generation: {}",
+            windowStartMillis, windowEndMillis, bufferMillis, bufferTimeStr, 
taskType);
+        continue;
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();
+      List<String> downloadURLs = new ArrayList<>();
+      Set<String> lastCompletedSegmentPerPartition = new 
HashSet<>(partitionToLatestCompletedSegmentName.values());
+      boolean skipGenerate = false;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
completedSegmentsMetadata) {
+        String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
+        long segmentStartTimeMillis = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
+        long segmentEndTimeMillis = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+
+        // Check overlap with window
+        if (windowStartMillis <= segmentEndTimeMillis && 
segmentStartTimeMillis < windowEndMillis) {
+          // If last completed segment is being used, make sure that segment 
crosses over end of window.

Review comment:
       +1

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, 
_clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. 
Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);

Review comment:
       IMO it is fine to log one warning per table. It is quite critical issue 
if minion stuck for this job (data loss if it doesn't move the segments on 
time), so we should detect it as soon as possible

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, 
_clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. 
Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new 
ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new 
HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, 
partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, 
skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not 
ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = 
tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be 
null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);

Review comment:
       ```suggestion
         String bucketTimePeriod =
             
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
DEFAULT_BUCKET_PERIOD);
         String bufferTimePeriod =
             
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);

Review comment:
       (nit) No need to extract raw table name
   ```suggestion
         String realtimeTableName = tableConfig.getTableName();
   
         if (tableConfig.getTableType() != TableType.REALTIME) {
           LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, realtimeTableName);
           continue;
         }
         LOGGER.info("Start generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType

Review comment:
       Reformat this part of the javadoc (keep each line less than 120 
characters)

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type 
<code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which 
tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + 
bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMillis";
+
+  private final String _tableNameWithType;
+  private long _watermarkMillis;

Review comment:
       Should we change this to final and make the metadata immutable?

##########
File path: 
pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java
##########
@@ -26,6 +26,12 @@
  */
 public interface PinotTaskExecutor {
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  default void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       I don't think we should put `preProcess()` and `postProcess()` into the 
interface. You can still keep them in the 
`RealtimeToOfflineSegmentsTaskExecutor` for better organization of the code.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
##########
@@ -68,4 +69,26 @@
     }
     return runningSegments;
   }
+
+  /**
+   * Gets all the tasks for the provided task type and tableName, which do not 
have TaskState COMPLETED
+   * @return map containing task name to task state for non-completed tasks
+   */
+  public static Map<String, TaskState> getNonCompletedTasks(String taskType, 
String tableNameWithType,
+      ClusterInfoProvider clusterInfoProvider) {
+
+    Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+    Map<String, TaskState> taskStates = 
clusterInfoProvider.getTaskStates(taskType);
+    for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
+      if (entry.getValue() == TaskState.COMPLETED) {
+        continue;
+      }

Review comment:
       Also skip the old tasks?

##########
File path: 
pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -162,11 +212,34 @@
     for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new 
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
-          .setTableNameWithType(tableNameWithType).build());
+          .setTableNameWithType(offlineTableName).build());
     }
     return results;
   }
 
+  /**
+   * Fetches the realtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
+   * Checks that the version of the ZNode matches with the version cached 
earlier. If yes, proceeds to update watermark in the ZNode
+   * TODO: Making the minion task update the ZK metadata is an anti-pattern, 
however cannot another way to do it
+   */
+  @Override
+  public void postProcess(PinotTaskConfig pinotTaskConfig) {
+    String realtimeTableName = 
pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+
+    ZNRecord realtimeToOfflineSegmentsTaskRecord =

Review comment:
       No need to check version again, the 
`setRealtimeToOfflineSegmentsTaskMetadata()` can perform the version check

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, 
_clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. 
Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new 
ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new 
HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, 
partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, 
skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not 
ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = 
tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be 
null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
+      String startTimeStr = 
taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);

Review comment:
       Same for other `Millis`
   ```suggestion
         long bucketTimeMs = TimeUtils.convertPeriodToMillis(bucketTimeStr);
         long bufferTimeMs = TimeUtils.convertPeriodToMillis(bufferTimeStr);
   ```

##########
File path: 
pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java
##########
@@ -26,6 +26,12 @@
  */
 public interface PinotTaskExecutor {
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  default void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       > Why not make this a boolean return, so that if the pre-process fails 
we do not go further in the task execution? Not sure what helix does when we 
throw exceptions from the task executor -- probably retries the task, but that 
may not be good for us.
   
   I prefer throwing exception when something went wrong to force the caller to 
handle the exception, or the caller could just ignore the return value, which 
could cause unexpected behavior. Minion code can handle the exception and fail 
the task

##########
File path: 
pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -54,17 +57,60 @@
  * A task to convert segments from a REALTIME table to segments for its 
corresponding OFFLINE table.
  * The realtime segments could span across multiple time windows. This task 
extracts data and creates segments for a configured time range.
  * The {@link SegmentProcessorFramework} is used for the segment conversion, 
which also does
- * 1. time column rollup
- * 2. time window extraction using filter function
+ * 1. time window extraction using filter function
+ * 2. time column rollup
  * 3. partitioning using table config's segmentPartitioningConfig
  * 4. aggregations and rollup
  * 5. data sorting
+ *
+ * Before beginning the task, the <code>watermarkMillis</code> is checked in 
the minion task metadata ZNode, located at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/<tableNameWithType>
+ * It should match the <code>windowStartMillis</code>.
+ * The version of the znode is cached.
+ *
+ * After the segments are uploaded, this task updates the 
<code>watermarkMillis</code> in the minion task metadata ZNode.
+ * The znode version is checked during update, and update only succeeds if 
version matches with the previously cached version
  */
 public class RealtimeToOfflineSegmentsTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
   private static final String INPUT_SEGMENTS_DIR = "input_segments";
   private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+  private int _expectedVersion = -1;

Review comment:
       Put `Integer.MIN_VALUE` as the default? We don't want to override the 
ZNRecord if this is not correctly set

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type 
<code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which 
tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + 
bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMillis";

Review comment:
       I would suggest `watermarkMs` to be consistent with other classes

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at 
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time 
found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = 
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will 
be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those 
containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =

Review comment:
       We can keep the same logic as in 
`TaskGeneratorUtils.getRunningSegments()` to skip old tasks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to