This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bf5d02  RealtimeToOfflineSegments task generator (#6124)
1bf5d02 is described below

commit 1bf5d021db25fe09d00e31871b5572c149ea29e6
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Wed Oct 21 09:51:57 2020 -0700

    RealtimeToOfflineSegments task generator (#6124)
    
    Here's the final piece of the feature for Pinot managed offline flows.
    This is the TaskGenerator which will create tasks of 
realtimeToOfflineSegmentsTask type.
    Typical usecase:
    You have setup a realtime table. You want to make it a hybrid table. Using 
this feature, you don't have to write your offline flows. Just set 
realtimeToOfflineSegmentsTask in your table config. Bring up minions. The 
minion tasks will push data to the offline table, 1 day at a time.
    The window size, buffer, segment processing configs are configurable via 
table task config.
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 .../common/minion/MinionTaskMetadataUtils.java     |  81 ++++
 .../RealtimeToOfflineSegmentsTaskMetadata.java     |  88 ++++
 .../RealtimeToOfflineSegmentsTaskMetadataTest.java |  46 +++
 ...rInfoProvider.java => ClusterInfoAccessor.java} |  40 +-
 .../helix/core/minion/PinotTaskManager.java        |  10 +-
 .../generator/ConvertToRawIndexTaskGenerator.java  |  14 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    | 312 ++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   7 +-
 .../core/minion/generator/TaskGeneratorUtils.java  |  52 ++-
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 452 +++++++++++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  26 +-
 .../processing/framework/SegmentMapper.java        |  12 +-
 .../processing/framework/SegmentMapperTest.java    |   2 +-
 ...fflineSegmentsMinionClusterIntegrationTest.java | 216 ++++++++++
 .../tests/SimpleMinionClusterIntegrationTest.java  |  12 +-
 .../org/apache/pinot/minion/MinionStarter.java     |   4 +-
 .../BaseMultipleSegmentsConversionExecutor.java    |  15 +
 .../pinot/minion/executor/BaseTaskExecutor.java    |   3 +-
 .../executor/MinionTaskZkMetadataManager.java      |  57 +++
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  88 +++-
 ...altimeToOfflineSegmentsTaskExecutorFactory.java |  12 +-
 .../executor/TaskExecutorFactoryRegistry.java      |   4 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |  95 +++--
 24 files changed, 1548 insertions(+), 105 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index af5a5d6..9fa56fa 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -59,6 +59,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = 
"/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = 
"/CONFIGS/CLUSTER";
   private static final String PROPERTYSTORE_SEGMENT_LINEAGE = 
"/SEGMENT_LINEAGE";
+  private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = 
"/MINION_TASK_METADATA";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -116,6 +117,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE, 
tableNameWithType);
   }
 
+  public static String constructPropertyStorePathForMinionTaskMetadata(String 
taskType, String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
taskType, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
new file mode 100644
index 0000000..43ac82c
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Helper methods to fetch/persist ZNRecord for minion task metadata
+ */
+public final class MinionTaskMetadataUtils {
+
+  private MinionTaskMetadataUtils() {
+
+  }
+
+  /**
+   * Fetches the ZNRecord for the given minion task and tableName, from 
MINION_TASK_METADATA/taskName/tableNameWthType
+   */
+  @Nullable
+  public static ZNRecord 
fetchMinionTaskMetadataZNRecord(HelixPropertyStore<ZNRecord> propertyStore, 
String taskType,
+      String tableNameWithType) {
+    String path = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, 
tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (znRecord != null) {
+      znRecord.setVersion(stat.getVersion());
+    }
+    return znRecord;
+  }
+
+  /**
+   * Fetches the ZNRecord for RealtimeToOfflineSegmentsTask for given 
tableNameWithType from 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType
+   * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
+   */
+  @Nullable
+  public static RealtimeToOfflineSegmentsTaskMetadata 
getRealtimeToOfflineSegmentsTaskMetadata(
+      HelixPropertyStore<ZNRecord> propertyStore, String taskType, String 
tableNameWithType) {
+    ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore, 
taskType, tableNameWithType);
+    return znRecord != null ? 
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
+  }
+
+  /**
+   * Persists the provided {@link RealtimeToOfflineSegmentsTaskMetadata} to 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType.
+   * Will fail if expectedVersion does not match.
+   * Set expectedVersion -1 to override version check.
+   */
+  public static void 
persistRealtimeToOfflineSegmentsTaskMetadata(HelixPropertyStore<ZNRecord> 
propertyStore,
+      String taskType, RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata,
+      int expectedVersion) {
+    String path = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+        realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType());
+    if (!propertyStore
+        .set(path, realtimeToOfflineSegmentsTaskMetadata.toZNRecord(), 
expectedVersion, AccessOption.PERSISTENT)) {
+      throw new ZkException(
+          "Failed to persist minion RealtimeToOfflineSegmentsTask metadata: " 
+ realtimeToOfflineSegmentsTaskMetadata);
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
new file mode 100644
index 0000000..2bd9c4c
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type 
<code>RealtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the 
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMs";
+
+  private final String _tableNameWithType;
+  private final long _watermarkMs;
+
+  public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long 
watermarkMs) {
+    _tableNameWithType = tableNameWithType;
+    _watermarkMs = watermarkMs;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  /**
+   * Get the watermark in millis
+   */
+  public long getWatermarkMs() {
+    return _watermarkMs;
+  }
+
+  public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord 
znRecord) {
+    long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
+    return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), 
watermark);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord znRecord = new ZNRecord(_tableNameWithType);
+    znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
+    return znRecord;
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.objectToString(this);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonString();
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
new file mode 100644
index 0000000..e5a4db2
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata;
+
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for converting to and from ZNRecord to {@link 
RealtimeToOfflineSegmentsTaskMetadata}
+ */
+public class RealtimeToOfflineSegmentsTaskMetadataTest {
+
+  @Test
+  public void testToFromZNRecord() {
+    RealtimeToOfflineSegmentsTaskMetadata metadata =
+        new RealtimeToOfflineSegmentsTaskMetadata("testTable_REALTIME", 1000);
+    ZNRecord znRecord = metadata.toZNRecord();
+    assertEquals(znRecord.getId(), "testTable_REALTIME");
+    assertEquals(znRecord.getSimpleField("watermarkMs"), "1000");
+
+    RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+        RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
+    assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(), 
"testTable_REALTIME");
+    assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), 1000);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
similarity index 70%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 678d10d..8d3db71 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -23,11 +23,15 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -37,12 +41,12 @@ import org.apache.pinot.spi.data.Schema;
  * The class <code>ClusterInfoProvider</code> is an abstraction on top of 
{@link PinotHelixResourceManager} and
  * {@link PinotHelixTaskResourceManager} which provides cluster information 
for {@link PinotTaskGenerator}.
  */
-public class ClusterInfoProvider {
+public class ClusterInfoAccessor {
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private final PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
   private final ControllerConf _controllerConf;
 
-  public ClusterInfoProvider(PinotHelixResourceManager 
pinotHelixResourceManager,
+  public ClusterInfoAccessor(PinotHelixResourceManager 
pinotHelixResourceManager,
       PinotHelixTaskResourceManager pinotHelixTaskResourceManager, 
ControllerConf controllerConf) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
@@ -94,6 +98,38 @@ public class ClusterInfoProvider {
   }
 
   /**
+   * Get all segment metadata for the given lowlevel REALTIME table name.
+   *
+   * @param tableName Table name with or without REALTIME type suffix
+   * @return List of segment metadata
+   */
+  public List<LLCRealtimeSegmentZKMetadata> 
getLLCRealtimeSegmentsMetadata(String tableName) {
+    return ZKMetadataProvider
+        
.getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(),
 tableName);
+  }
+
+  /**
+   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from 
MINION_TASK_METADATA for given realtime table
+   * @param tableNameWithType realtime table name
+   */
+  public RealtimeToOfflineSegmentsTaskMetadata 
getMinionRealtimeToOfflineSegmentsTaskMetadata(
+      String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        
.getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into 
MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata) {
+    
MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+        MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
realtimeToOfflineSegmentsTaskMetadata, -1);
+  }
+
+  /**
    * Get all tasks' state for the given task type.
    *
    * @param taskType Task type
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 93422f7..7a17718 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -48,7 +48,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskManager.class);
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
-  private final ClusterInfoProvider _clusterInfoProvider;
+  private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
@@ -58,8 +58,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
         controllerConf.getPinotTaskManagerInitialDelaySeconds(), 
helixResourceManager, leadControllerManager,
         controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
-    _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, 
helixTaskResourceManager, controllerConf);
-    _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
+    _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, 
helixTaskResourceManager, controllerConf);
+    _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
   }
 
   /**
@@ -69,8 +69,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *
    * @return Cluster info provider
    */
-  public ClusterInfoProvider getClusterInfoProvider() {
-    return _clusterInfoProvider;
+  public ClusterInfoAccessor getClusterInfoAccessor() {
+    return _clusterInfoAccessor;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index dc4acf7..437ac93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.data.Segment;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -39,10 +39,10 @@ import org.slf4j.LoggerFactory;
 public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
 
-  private final ClusterInfoProvider _clusterInfoProvider;
+  private final ClusterInfoAccessor _clusterInfoAccessor;
 
-  public ConvertToRawIndexTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
-    _clusterInfoProvider = clusterInfoProvider;
+  public ConvertToRawIndexTaskGenerator(ClusterInfoAccessor 
clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
   }
 
   @Override
@@ -56,7 +56,7 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
 
     // Get the segments that are being converted so that we don't submit them 
again
     Set<Segment> runningSegments =
-        
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
 _clusterInfoProvider);
+        
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
 _clusterInfoAccessor);
 
     for (TableConfig tableConfig : tableConfigs) {
       // Only generate tasks for OFFLINE tables
@@ -90,7 +90,7 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
 
       // Generate tasks
       int tableNumTasks = 0;
-      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
_clusterInfoProvider
+      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
_clusterInfoAccessor
           .getOfflineSegmentsMetadata(offlineTableName)) {
         // Generate up to tableMaxNumTasks tasks each time for each table
         if (tableNumTasks == tableMaxNumTasks) {
@@ -111,7 +111,7 @@ public class ConvertToRawIndexTaskGenerator implements 
PinotTaskGenerator {
           configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
           configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
           configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
offlineSegmentZKMetadata.getDownloadUrl());
-          configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoProvider.getVipUrl() + "/segments");
+          configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
           configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(offlineSegmentZKMetadata.getCrc()));
           if (columnsToConvertConfig != null) {
             
configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, 
columnsToConvertConfig);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
new file mode 100644
index 0000000..b505d28
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type 
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMs is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode
+ *  found at 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *  In case of cold-start, no ZNode will exist.
+ *  A new ZNode will be created, with watermarkMs as the smallest time found 
in the COMPLETED segments
+ *
+ *  - The execution window for the task is calculated as,
+ *  windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs,
+ *  where bucketTime can be provided in the taskConfigs (default 1d)
+ *
+ *  - If the execution window is not older than bufferTimeMs, no task will be 
generated,
+ *  where bufferTime can be provided in the taskConfigs (default 2d)
+ *
+ *  - Segment metadata is scanned for all COMPLETED segments,
+ *  to pick those containing data in window [windowStartMs, windowEndMs)
+ *
+ *  - There are some special considerations for using last completed segment 
of a partition.
+ *  Such segments will be checked for segment endTime, to ensure there's no 
overflow into CONSUMING segments
+ *
+ *  - A PinotTaskConfig is created, with segment information, execution 
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements 
PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoAccessor _clusterInfoAccessor;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoAccessor 
clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String realtimeTableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", 
taskType, realtimeTableName);
+        continue;
+      }
+      if (new StreamConfig(realtimeTableName, 
tableConfig.getIndexingConfig().getStreamConfigs())
+          .hasHighLevelConsumerType()) {
+        LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", 
taskType, realtimeTableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
realtimeTableName, taskType);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> incompleteTasks =
+          TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, 
_clusterInfoAccessor);
+      if (!incompleteTasks.isEmpty()) {
+        LOGGER
+            .warn("Found incomplete tasks: {} for same table: {}. Skipping 
task generation.", incompleteTasks.keySet(),
+                realtimeTableName);
+        continue;
+      }
+
+      // Get all segment metadata for completed segments (DONE status).
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new 
ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new 
HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, 
partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime-completed segments found for table: {}, 
skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not 
ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = 
tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be 
null for table: {}", realtimeTableName);
+
+      // Get the bucket size and buffer
+      String bucketTimePeriod =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
DEFAULT_BUCKET_PERIOD);
+      String bufferTimePeriod =
+          
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
DEFAULT_BUFFER_PERIOD);
+      long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
+      long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. 
WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMs = getWatermarkMs(realtimeTableName, 
completedSegmentsMetadata, bucketMs);
+      long windowEndMs = windowStartMs + bucketMs;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: 
{} configured as {} ago. Skipping task generation: {}",
+            windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+        continue;
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: 
windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();
+      List<String> downloadURLs = new ArrayList<>();
+      Set<String> lastCompletedSegmentPerPartition = new 
HashSet<>(partitionToLatestCompletedSegmentName.values());
+      boolean skipGenerate = false;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
completedSegmentsMetadata) {
+        String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
+        long segmentStartTimeMs = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
+        long segmentEndTimeMs = 
timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+
+        // Check overlap with window
+        if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < 
windowEndMs) {
+          // If last completed segment is being used, make sure that segment 
crosses over end of window.
+          // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data would be skipped forever.
+          if (lastCompletedSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
+            LOGGER.info(
+                "Window data overflows into CONSUMING segments for partition 
of segment: {}. Skipping task generation: {}",
+                segmentName, taskType);
+            skipGenerate = true;
+            break;
+          }
+          segmentNames.add(segmentName);
+          downloadURLs.add(realtimeSegmentZKMetadata.getDownloadUrl());
+        }
+      }
+
+      if (segmentNames.isEmpty() || skipGenerate) {
+        LOGGER.info("Found no eligible segments for task: {} with window [{} - 
{}). Skipping task generation", taskType,
+            windowStartMs, windowEndMs);
+        continue;
+      }
+
+      Map<String, String> configs = new HashMap<>();
+      configs.put(MinionConstants.TABLE_NAME_KEY, realtimeTableName);
+      configs.put(MinionConstants.SEGMENT_NAME_KEY, 
StringUtils.join(segmentNames, ","));
+      configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
+      configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+
+      // Execution window
+      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
String.valueOf(windowStartMs));
+      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
String.valueOf(windowEndMs));
+
+      // Segment processor configs
+      String timeColumnTransformationConfig =
+          
taskConfigs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
+      if (timeColumnTransformationConfig != null) {
+        
configs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY, 
timeColumnTransformationConfig);
+      }
+      String collectorTypeConfig = 
taskConfigs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+      if (collectorTypeConfig != null) {
+        configs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
collectorTypeConfig);
+      }
+      for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+        if 
(entry.getKey().endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX))
 {
+          configs.put(entry.getKey(), entry.getValue());
+        }
+      }
+      String maxNumRecordsPerSegmentConfig =
+          
taskConfigs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+      if (maxNumRecordsPerSegmentConfig != null) {
+        
configs.put(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, 
maxNumRecordsPerSegmentConfig);
+      }
+
+      pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+      LOGGER.info("Finished generating task configs for table: {} for task: 
{}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  /**
+   * Fetch completed (non-consuming) segment and partition information
+   * @param realtimeTableName the realtime table name
+   * @param completedSegmentsMetadataList list for collecting the completed 
segments metadata
+   * @param partitionToLatestCompletedSegmentName map for collecting the 
partitionId to the latest completed segment name
+   * @param allPartitions set for collecting all partition ids
+   */
+  private void getCompletedSegmentsInfo(String realtimeTableName,
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList,
+      Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer> 
allPartitions) {
+    List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+        _clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+
+    Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
+    for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) 
{
+      LLCSegmentName llcSegmentName = new 
LLCSegmentName(metadata.getSegmentName());
+      allPartitions.add(llcSegmentName.getPartitionId());
+
+      if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+        completedSegmentsMetadataList.add(metadata);
+        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), 
(partitionId, latestLLCSegmentName) -> {
+          if (latestLLCSegmentName == null) {
+            return llcSegmentName;
+          } else {
+            if (llcSegmentName.getSequenceNumber() > 
latestLLCSegmentName.getSequenceNumber()) {
+              return llcSegmentName;
+            } else {
+              return latestLLCSegmentName;
+            }
+          }
+        });
+      }
+    }
+
+    for (Map.Entry<Integer, LLCSegmentName> entry : 
latestLLCSegmentNameMap.entrySet()) {
+      partitionToLatestCompletedSegmentName.put(entry.getKey(), 
entry.getValue().getSegmentName());
+    }
+  }
+
+  /**
+   * Get the watermark from the RealtimeToOfflineSegmentsMetadata ZNode.
+   * If the znode is null, computes the watermark using either the start time 
config or the start time from segment metadata
+   */
+  private long getWatermarkMs(String realtimeTableName, 
List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata,
+      long bucketMs) {
+    RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+        
_clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+    if (realtimeToOfflineSegmentsTaskMetadata == null) {
+      // No ZNode exists. Cold-start.
+      long watermarkMs;
+
+      // Find the smallest time from all segments
+      RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
completedSegmentsMetadata) {
+        if (minSegmentZkMetadata == null || 
realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+            .getStartTime()) {
+          minSegmentZkMetadata = realtimeSegmentZKMetadata;
+        }
+      }
+      Preconditions.checkState(minSegmentZkMetadata != null);
+
+      // Convert the segment minTime to millis
+      long minSegmentStartTimeMs = 
minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+      // Round off according to the bucket. This ensures we align the offline 
segments to proper time boundaries
+      // For example, if start time millis is 20200813T12:34:59, we want to 
create the first segment for window [20200813, 20200814)
+      watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+
+      // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark 
calculated above
+      realtimeToOfflineSegmentsTaskMetadata = new 
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
+      
_clusterInfoAccessor.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+    }
+    return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index ff8d37e..f112d8b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 
 
@@ -33,8 +33,9 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 public class TaskGeneratorRegistry {
   private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new 
HashMap<>();
 
-  public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider 
clusterInfoProvider) {
-    registerTaskGenerator(new 
ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+  public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor 
clusterInfoAccessor) {
+    registerTaskGenerator(new 
ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
+    registerTaskGenerator(new 
RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
index 31f0c70..e4878a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.minion.generator;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.data.Segment;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -39,13 +40,13 @@ public class TaskGeneratorUtils {
    * NOTE: we consider tasks not finished in one day as stuck and don't count 
the segments in them
    *
    * @param taskType Task type
-   * @param clusterInfoProvider Cluster info provider
+   * @param clusterInfoAccessor Cluster info accessor
    * @return Set of running segments
    */
   public static Set<Segment> getRunningSegments(@Nonnull String taskType,
-      @Nonnull ClusterInfoProvider clusterInfoProvider) {
+      @Nonnull ClusterInfoAccessor clusterInfoAccessor) {
     Set<Segment> runningSegments = new HashSet<>();
-    Map<String, TaskState> taskStates = 
clusterInfoProvider.getTaskStates(taskType);
+    Map<String, TaskState> taskStates = 
clusterInfoAccessor.getTaskStates(taskType);
     for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
       // Skip COMPLETED tasks
       if (entry.getValue() == TaskState.COMPLETED) {
@@ -54,13 +55,11 @@ public class TaskGeneratorUtils {
 
       // Skip tasks scheduled for more than one day
       String taskName = entry.getKey();
-      long scheduleTimeMs = Long.parseLong(
-          
taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
 + 1));
-      if (System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS) {
+      if (isTaskOlderThanOneDay(taskName)) {
         continue;
       }
 
-      for (PinotTaskConfig pinotTaskConfig : 
clusterInfoProvider.getTaskConfigs(entry.getKey())) {
+      for (PinotTaskConfig pinotTaskConfig : 
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
         Map<String, String> configs = pinotTaskConfig.getConfigs();
         runningSegments.add(
             new Segment(configs.get(MinionConstants.TABLE_NAME_KEY), 
configs.get(MinionConstants.SEGMENT_NAME_KEY)));
@@ -68,4 +67,41 @@ public class TaskGeneratorUtils {
     }
     return runningSegments;
   }
+
+  /**
+   * Gets all the tasks for the provided task type and tableName, which do not 
have TaskState COMPLETED
+   * @return map containing task name to task state for non-completed tasks
+   *
+   * NOTE: we consider tasks not finished in one day as stuck and don't count 
them
+   */
+  public static Map<String, TaskState> getIncompleteTasks(String taskType, 
String tableNameWithType,
+      ClusterInfoAccessor clusterInfoAccessor) {
+
+    Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+    Map<String, TaskState> taskStates = 
clusterInfoAccessor.getTaskStates(taskType);
+    for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
+      if (entry.getValue() == TaskState.COMPLETED) {
+        continue;
+      }
+      String taskName = entry.getKey();
+      if (isTaskOlderThanOneDay(taskName)) {
+        continue;
+      }
+      for (PinotTaskConfig pinotTaskConfig : 
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
+        if 
(tableNameWithType.equals(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)))
 {
+          nonCompletedTasks.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return nonCompletedTasks;
+  }
+
+  /**
+   * Returns true if task's schedule time is older than 1d
+   */
+  private static boolean isTaskOlderThanOneDay(String taskName) {
+    long scheduleTimeMs =
+        
Long.parseLong(taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
 + 1));
+    return System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
new file mode 100644
index 0000000..5aa3377
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link RealtimeToOfflineSegmentsTaskGenerator}
+ */
+public class RealtimeToOfflineSegmentsTaskGeneratorTest {
+
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private final Map<String, String> streamConfigs = new HashMap<>();
+
+  @BeforeClass
+  public void setup() {
+    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, "kafka");
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty("kafka", 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+            StreamConfig.ConsumerType.LOWLEVEL.toString());
+    streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka", 
StreamConfigProperties.STREAM_TOPIC_NAME),
+        "myTopic");
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty("kafka", 
StreamConfigProperties.STREAM_DECODER_CLASS),
+            "org.foo.Decoder");
+  }
+
+  private TableConfig getRealtimeTableConfig(Map<String, Map<String, String>> 
taskConfigsMap) {
+    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+        .setStreamConfigs(streamConfigs).setTaskConfig(new 
TableTaskConfig(taskConfigsMap)).build();
+  }
+
+  /**
+   * Tests for some config checks
+   */
+  @Test
+  public void testGenerateTasksCheckConfigs() {
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
5000, 50_000, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // Skip task generation, if offline table
+    TableConfig offlineTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // No tableTaskConfig, error
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+    realtimeTableConfig.setTaskConfig(null);
+    try {
+      generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+      Assert.fail("Should have failed for null tableTaskConfig");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // No taskConfig for task, error
+    realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+    try {
+      generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+      Assert.fail("Should have failed for null taskConfig");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Tests for some constraints on simultaneous tasks scheduled
+   */
+  @Test
+  public void testGenerateTasksSimultaneousConstraints() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    Map<String, TaskState> taskStatesMap = new HashMap<>();
+    String taskName = "Task_RealtimeToOfflineSegmentsTask_" + 
System.currentTimeMillis();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(MinionConstants.TABLE_NAME_KEY, REALTIME_TABLE_NAME);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(taskStatesMap);
+    when(mockClusterInfoProvide.getTaskConfigs(taskName))
+        .thenReturn(Lists.newArrayList(new 
PinotTaskConfig(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs)));
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
80_000_000, 90_000_000,
+            TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // if same task and table, IN_PROGRESS, then don't generate again
+    taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // if same task and table, but COMPLETED, generate
+    taskStatesMap.put(taskName, TaskState.COMPLETED);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+
+    // if same task and table, IN_PROGRESS, but older than 1 day, generate
+    String oldTaskName =
+        "Task_RealtimeToOfflineSegmentsTask_" + (System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3));
+    taskStatesMap.remove(taskName);
+    taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+  }
+
+  /**
+   * Tests for realtime table with no segments
+   */
+  @Test
+  public void testGenerateTasksNoSegments() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    // No segments in table
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // No COMPLETED segments in table
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", 
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1));
+
+    generator = new 
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // 2 partitions. No COMPLETED segments for partition 0
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 
5000, 10000, TimeUnit.MILLISECONDS, null);
+    LLCRealtimeSegmentZKMetadata seg3 =
+        getRealtimeSegmentZKMetadata("testTable__1__1__13456", 
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2, seg3));
+
+    generator = new 
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+  }
+
+  /**
+   * Test cold start. No minion metadata exists. Watermark is calculated based 
on config or existing segments
+   */
+  @Test
+  public void testGenerateTasksNoMinionMetadata() {
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(null);
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download1"); // 21 May 2020 8am to 22 May 
2020 8am UTC
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 
1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download2"); // 21 May 2020 8am to 22 May 
2020 8am UTC
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+
+    // StartTime calculated using segment metadata
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), 
REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), 
"testTable__0__0__12345,testTable__1__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), 
"download1,download2");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), 
"1590019200000"); // 21 May 2020 UTC
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), 
"1590105600000"); // 22 May 2020 UTC
+
+    // Segment metadata in hoursSinceEpoch
+    seg1 = getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
441680L, 441703L, TimeUnit.HOURS,
+        "download1"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    seg2 = getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 
441680L, 441703L, TimeUnit.HOURS,
+        "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+    generator = new 
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), 
REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), 
"testTable__0__0__12345,testTable__1__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), 
"download1,download2");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), 
"1590019200000"); // 21 May 2020 UTC
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), 
"1590105600000");  // 22 May 2020 UTC
+  }
+
+  /**
+   * Tests for subsequent runs after cold start
+   */
+  @Test
+  public void testGenerateTasksWithMinionMetadata() {
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 
21 May 2020 UTC
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
1589972400000L, 1590048000000L,
+            TimeUnit.MILLISECONDS, "download1"); // 05-20-2020T11:00:00 to 
05-21-2020T08:00:00 UTC
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 
1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to 
05-22-2020T08:00:00 UTC
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+
+    // Default configs
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), 
REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), 
"testTable__0__0__12345,testTable__0__1__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), 
"download1,download2");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), 
"1590019200000"); // 5-21-2020
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), 
"1590105600000"); // 5-22-2020
+
+    // No segments match
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); // 
26 May 2020 UTC
+    generator = new 
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 0);
+
+    // Some segments match
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 
21 May 2020 UTC
+    taskConfigsMap = new HashMap<>();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"2h");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), 
REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), 
"testTable__0__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+        "1590019200000"); // 05-21-2020T00:00:00
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), 
"1590026400000"); // 05-21-2020T02:00:00
+
+    // Segment Processor configs
+    taskConfigsMap = new HashMap<>();
+    taskConfigs = new HashMap<>();
+    
taskConfigs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
 "foo");
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
+    taskConfigs.put("m1" + 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "MAX");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+    generator = new 
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), 
REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), 
"testTable__0__0__12345,testTable__0__1__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), 
"download1,download2");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+        "1590019200000"); // 05-21-2020T00:00:00
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), 
"1590105600000"); // 05-22-2020T00:00:00
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY),
 "foo");
+    
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY), 
"rollup");
+    assertEquals(configs.get("m1" + 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX), "MAX");
+  }
+
+  /**
+   * Tests for skipping task generation due to CONSUMING segments overlap with 
window
+   */
+  @Test
+  public void testOverflowIntoConsuming() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
50_000, 150_000, TimeUnit.MILLISECONDS,
+            null);
+    LLCRealtimeSegmentZKMetadata metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", 
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // last COMPLETED segment's endTime is less than windowEnd time. CONSUMING 
segment overlap. Skip task
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
100_000, 200_000, TimeUnit.MILLISECONDS,
+            null);
+    metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", 
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // last completed segment endtime ends at window end, allow
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
200_000, 86_500_000, TimeUnit.MILLISECONDS,
+            null);
+    metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", 
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+  }
+
+  @Test
+  public void testBuffer() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    // default buffer - 2d
+    long now = System.currentTimeMillis();
+    long watermarkMs = now - TimeUnit.DAYS.toMillis(1);
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+    
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
 HashMap<>());
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
watermarkMs - 100, watermarkMs + 100,
+            TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // custom buffer
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"15d");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    watermarkMs = now - TimeUnit.DAYS.toMillis(10);
+    
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new 
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 
watermarkMs - 100, watermarkMs + 100,
+            TimeUnit.MILLISECONDS, null);
+    
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+  }
+
+  private LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String 
segmentName, Status status, long startTime,
+      long endTime, TimeUnit timeUnit, String downloadURL) {
+    LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new 
LLCRealtimeSegmentZKMetadata();
+    realtimeSegmentZKMetadata.setSegmentName(segmentName);
+    realtimeSegmentZKMetadata.setStatus(status);
+    realtimeSegmentZKMetadata.setStartTime(startTime);
+    realtimeSegmentZKMetadata.setEndTime(endTime);
+    realtimeSegmentZKMetadata.setTimeUnit(timeUnit);
+    realtimeSegmentZKMetadata.setDownloadUrl(downloadURL);
+    return realtimeSegmentZKMetadata;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index f2049ea..cd98833 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -62,16 +62,30 @@ public class MinionConstants {
     public static final String MERGED_SEGMENT_NAME_KEY = 
"mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the 
corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
-    public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
-    public static final String WINDOW_START_MILLIS_KEY = "windowStartMillis";
-    public static final String WINDOW_END_MILLIS_KEY = "windowEndMillis";
-    // segment processing
+    public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window 
younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+
+    // Window start and window end set by task generator
+    public static final String WINDOW_START_MS_KEY = "windowStartMs";
+    public static final String WINDOW_END_MS_KEY = "windowEndMs";
+    // Segment processing related configs
     public static final String TIME_COLUMN_TRANSFORM_FUNCTION_KEY = 
"timeColumnTransformFunction";
     public static final String COLLECTOR_TYPE_KEY = "collectorType";
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = 
".aggregationType";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = 
"maxNumRecordsPerSegment";
-
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
index a947d29..a09f3b5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
@@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory;
  * Mapper phase of the SegmentProcessorFramework.
  * Reads the input segment and creates partitioned avro data files
  * Performs:
- * - record transformations
+ * - record filtering
+ * - column transformations
  * - partitioning
- * - partition filtering
  */
 public class SegmentMapper {
 
@@ -74,8 +74,8 @@ public class SegmentMapper {
 
     _mapperId = mapperId;
     _avroSchema = 
SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
-    _recordTransformer = 
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
     _recordFilter = 
RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
+    _recordTransformer = 
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
     for (PartitionerConfig partitionerConfig : 
mapperConfig.getPartitionerConfigs()) {
       _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
     }
@@ -101,14 +101,14 @@ public class SegmentMapper {
     while (segmentRecordReader.hasNext()) {
       reusableRow = segmentRecordReader.next(reusableRow);
 
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
       // Record filtering
       if (_recordFilter.filter(reusableRow)) {
         continue;
       }
 
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);
+
       // Partitioning
       int p = 0;
       for (Partitioner partitioner : _partitioners) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 1856b9d..88857e8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -275,7 +275,7 @@ public class SegmentMapperTest {
     SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
         new 
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue != 1597795200000}, 
timeValue)").build(), Lists.newArrayList(
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue 
>= 1597881600000}, timeValue)").build(), Lists.newArrayList(
         new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
             .setColumnName("timeValue").build()));
     Map<String, List<Object[]>> expectedRecords11 =
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
new file mode 100644
index 0000000..3f80e95
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test for minion task of type "RealtimeToOfflineSegmentsTask"
+ * With every task run, a new segment is created in the offline table for 1 
day. Watermark also keeps progressing accordingly.
+ */
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends 
RealtimeClusterIntegrationTest {
+
+  private PinotHelixTaskResourceManager _helixTaskResourceManager;
+  private PinotTaskManager _taskManager;
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  private long _dataSmallestTimeMillis;
+  private long _dateSmallestDays;
+  private String _realtimeTableName;
+  private String _offlineTableName;
+
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    return new TableTaskConfig(
+        
Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
 new HashMap<>()));
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Setup realtime table, and blank offline table
+    super.setUp();
+    addTableConfig(createOfflineTableConfig());
+    startMinion(null, null);
+
+    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+    _realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    _offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    List<RealtimeSegmentZKMetadata> realtimeSegmentMetadata =
+        
_pinotHelixResourceManager.getRealtimeSegmentMetadata(_realtimeTableName);
+    long minSegmentTime = Long.MAX_VALUE;
+    for (RealtimeSegmentZKMetadata metadata : realtimeSegmentMetadata) {
+      if (metadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
+        if (metadata.getStartTime() < minSegmentTime) {
+          minSegmentTime = metadata.getStartTime();
+        }
+      }
+    }
+    _dataSmallestTimeMillis = minSegmentTime;
+    _dateSmallestDays = minSegmentTime / 86400000;
+  }
+
+  @Test
+  public void testRealtimeToOfflineSegmentsTask() {
+
+    List<OfflineSegmentZKMetadata> offlineSegmentMetadata =
+        
_pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+    Assert.assertTrue(offlineSegmentMetadata.isEmpty());
+
+    long expectedWatermark = _dataSmallestTimeMillis;
+    int numOfflineSegments = 0;
+    long offlineSegmentTime = _dateSmallestDays;
+    for (int i = 0; i < 3; i++) {
+      // Schedule task
+      Assert.assertTrue(
+          
_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+          
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+      // Should not generate more tasks
+      Assert.assertFalse(
+          
_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+      expectedWatermark = expectedWatermark + 86400000;
+      // Wait at most 600 seconds for all tasks COMPLETED
+      waitForTaskToComplete(expectedWatermark);
+      // check segment is in offline
+      offlineSegmentMetadata = 
_pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+      Assert.assertEquals(offlineSegmentMetadata.size(), ++numOfflineSegments);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getStartTime(), 
offlineSegmentTime);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getEndTime(), 
offlineSegmentTime);
+      offlineSegmentTime++;
+    }
+    testHardcodedSqlQueries();
+  }
+
+  private void waitForTaskToComplete(long expectedWatermark) {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : _helixTaskResourceManager
+          
.getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values())
 {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+
+    // Check segment ZK metadata
+    RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
+        
_taskManager.getClusterInfoAccessor().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+    Assert.assertNotNull(minionTaskMetadata);
+    Assert.assertEquals(minionTaskMetadata.getWatermarkMs(), 
expectedWatermark);
+  }
+
+  @Test(enabled = false)
+  public void testSegmentListApi() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugOutput() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugRoutingTableSQL() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerResponseMetadata() {
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithoutMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedSqlQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions() {
+  }
+
+  @Test(enabled = false)
+  public void testReload(boolean includeOfflineTable) {
+  }
+
+  @Test(enabled = false)
+  public void testSqlQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testVirtualColumnQueries() {
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    stopMinion();
+
+    super.tearDown();
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a0be8ed..5232b7a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,7 +94,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     _taskManager = _controllerStarter.getTaskManager();
 
     // Register the test task generator into task manager
-    _taskManager.registerTaskGenerator(new 
TestTaskGenerator(_taskManager.getClusterInfoProvider()));
+    _taskManager.registerTaskGenerator(new 
TestTaskGenerator(_taskManager.getClusterInfoAccessor()));
 
     Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry =
         Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new 
TestTaskExecutorFactory());
@@ -199,10 +199,10 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
   private static class TestTaskGenerator implements PinotTaskGenerator {
     public static final String TASK_TYPE = "TestTask";
 
-    private final ClusterInfoProvider _clusterInfoProvider;
+    private final ClusterInfoAccessor _clusterInfoAccessor;
 
-    public TestTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
-      _clusterInfoProvider = clusterInfoProvider;
+    public TestTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+      _clusterInfoAccessor = clusterInfoAccessor;
     }
 
     @Override
@@ -215,7 +215,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       assertEquals(tableConfigs.size(), 2);
 
       // Generate at most 2 tasks
-      if (_clusterInfoProvider.getTaskStates(TASK_TYPE).size() >= 2) {
+      if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) {
         return Collections.emptyList();
       }
 
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index fd82dda..abb0788 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
 import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
 import org.apache.pinot.minion.metrics.MinionMeter;
@@ -81,7 +82,8 @@ public class MinionStarter implements ServiceStartable {
             + CommonConstants.Minion.DEFAULT_HELIX_PORT);
     setupHelixSystemProperties();
     _helixManager = new ZKHelixManager(helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, zkAddress);
-    _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry();
+    MinionTaskZkMetadataManager minionTaskZkMetadataManager = new 
MinionTaskZkMetadataManager(_helixManager);
+    _taskExecutorFactoryRegistry = new 
TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
     _eventObserverFactoryRegistry = new EventObserverFactoryRegistry();
   }
 
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index a17b10e..ee7be9e 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -64,9 +64,23 @@ public abstract class BaseMultipleSegmentsConversionExecutor 
extends BaseTaskExe
       File workingDir)
       throws Exception;
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  protected void preProcess(PinotTaskConfig pinotTaskConfig) {
+  }
+
+  /**
+   * Post processing operations to be done before exiting a successful task 
execution
+   */
+  protected void postProcess(PinotTaskConfig pinotTaskConfig) {
+  }
+
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig 
pinotTaskConfig)
       throws Exception {
+    preProcess(pinotTaskConfig);
+
     String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
@@ -141,6 +155,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       String outputSegmentNames = 
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
           .collect(Collectors.joining(","));
+      postProcess(pinotTaskConfig);
       LOGGER
           .info("Done executing {} on table: {}, input segments: {}, output 
segments: {}", taskType, tableNameWithType,
               inputSegmentNames, outputSegmentNames);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
index 619acc7..6875032 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
@@ -43,8 +43,7 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
   }
 
   protected Schema getSchema(String tableName) {
-    Schema schema =
-        
ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), 
tableName);
+    Schema schema = 
ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), 
tableName);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", tableName);
     return schema;
   }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
new file mode 100644
index 0000000..29354b3
--- /dev/null
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+
+
+/**
+ * An abstraction on top of {@link HelixManager}, created for the {@link 
PinotTaskExecutor}, restricted to only get/update minion task metadata
+ */
+public class MinionTaskZkMetadataManager {
+  private final HelixManager _helixManager;
+
+  public MinionTaskZkMetadataManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+  }
+
+  /**
+   * Fetch the ZNRecord under 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask for the given 
tableNameWithType
+   */
+  public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String 
tableNameWithType) {
+    return MinionTaskMetadataUtils
+        
.fetchMinionTaskMetadataZNRecord(_helixManager.getHelixPropertyStore(), 
RealtimeToOfflineSegmentsTask.TASK_TYPE,
+            tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask
+   * for the corresponding tableNameWithType
+   * @param expectedVersion Version expected to be updating, failing the call 
if there's a mismatch
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata, int expectedVersion) {
+    
MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_helixManager.getHelixPropertyStore(),
+        RealtimeToOfflineSegmentsTask.TASK_TYPE, 
realtimeToOfflineSegmentsTaskMetadata, expectedVersion);
+  }
+}
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 936f027..82cb2fe 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
@@ -46,25 +48,71 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * A task to convert segments from a REALTIME table to segments for its 
corresponding OFFLINE table.
- * The realtime segments could span across multiple time windows. This task 
extracts data and creates segments for a configured time range.
+ * The realtime segments could span across multiple time windows.
+ * This task extracts data and creates segments for a configured time range.
  * The {@link SegmentProcessorFramework} is used for the segment conversion, 
which also does
- * 1. time column rollup
- * 2. time window extraction using filter function
+ * 1. time window extraction using filter function
+ * 2. time column rollup
  * 3. partitioning using table config's segmentPartitioningConfig
  * 4. aggregations and rollup
  * 5. data sorting
+ *
+ * Before beginning the task, the <code>watermarkMs</code> is checked in the 
minion task metadata ZNode,
+ * located at 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/<tableNameWithType>
+ * It should match the <code>windowStartMs</code>.
+ * The version of the znode is cached.
+ *
+ * After the segments are uploaded, this task updates the 
<code>watermarkMs</code> in the minion task metadata ZNode.
+ * The znode version is checked during update,
+ * and update only succeeds if version matches with the previously cached 
version
  */
 public class RealtimeToOfflineSegmentsTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
   private static final String INPUT_SEGMENTS_DIR = "input_segments";
   private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+  private int _expectedVersion = Integer.MIN_VALUE;
+  private long _nextWatermark;
+
+  public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager 
minionTaskZkMetadataManager) {
+    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+  }
+
+  /**
+   * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
+   * Checks that the <code>watermarkMs</code> from the ZNode matches the 
windowStartMs in the task configs.
+   * If yes, caches the ZNode version to check during update.
+   */
+  @Override
+  public void preProcess(PinotTaskConfig pinotTaskConfig) {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+        
_minionTaskZkMetadataManager.getRealtimeToOfflineSegmentsTaskZNRecord(realtimeTableName);
+    Preconditions.checkState(realtimeToOfflineSegmentsTaskZNRecord != null,
+        "RealtimeToOfflineSegmentsTaskMetadata ZNRecord for table: %s should 
not be null. Exiting task.",
+        realtimeTableName);
+
+    RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+        
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+    long windowStartMs = 
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+    
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() 
== windowStartMs,
+        "watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s does not 
match windowStartMs: %d in task configs for table: %s. "
+            + "ZNode may have been modified by another task", 
realtimeToOfflineSegmentsTaskMetadata, windowStartMs,
+        realtimeTableName);
+
+    _expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+  }
+
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
@@ -74,19 +122,22 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends 
BaseMultipleSegmentsC
     LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
     long startMillis = System.currentTimeMillis();
 
-    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); // 
rawTableName_OFFLINE expected here
-    TableConfig tableConfig = getTableConfig(tableNameWithType);
-    Schema schema = getSchema(tableNameWithType);
+    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    TableConfig tableConfig = getTableConfig(offlineTableName);
+    Schema schema = getSchema(offlineTableName);
     Set<String> schemaColumns = schema.getPhysicalColumnNames();
     String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
     DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumn);
     Preconditions
         .checkState(dateTimeFieldSpec != null, "No valid spec found for time 
column: %s in schema for table: %s",
-            timeColumn, tableNameWithType);
+            timeColumn, offlineTableName);
+
+    long windowStartMs = 
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+    long windowEndMs = 
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
+    _nextWatermark = windowEndMs;
 
-    long windowStartMs =
-        
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY));
-    long windowEndMs = 
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY));
     String timeColumnTransformFunction =
         
configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
     String collectorTypeStr = 
configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
@@ -120,7 +171,7 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends 
BaseMultipleSegmentsC
     if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap =
           
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
-      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, offlineTableName, schemaColumns);
       
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
     }
 
@@ -162,12 +213,25 @@ public class RealtimeToOfflineSegmentsTaskExecutor 
extends BaseMultipleSegmentsC
     for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new 
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
-          .setTableNameWithType(tableNameWithType).build());
+          .setTableNameWithType(offlineTableName).build());
     }
     return results;
   }
 
   /**
+   * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
+   * Checks that the version of the ZNode matches with the version cached 
earlier. If yes, proceeds to update watermark in the ZNode
+   * TODO: Making the minion task update the ZK metadata is an anti-pattern, 
however cannot see another way to do it
+   */
+  @Override
+  public void postProcess(PinotTaskConfig pinotTaskConfig) {
+    String realtimeTableName = 
pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+    RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
+        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
_nextWatermark);
+    
_minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata,
 _expectedVersion);
+  }
+
+  /**
    * Construct a {@link RecordTransformerConfig} for time column transformation
    */
   private RecordTransformerConfig getRecordTransformerConfigForTime(String 
timeColumnTransformFunction,
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index b2db61f..7eabbc4 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -18,9 +18,19 @@
  */
 package org.apache.pinot.minion.executor;
 
+/**
+ * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
+ */
 public class RealtimeToOfflineSegmentsTaskExecutorFactory implements 
PinotTaskExecutorFactory {
+
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+
+  public 
RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager 
minionTaskZkMetadataManager) {
+    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+  }
+
   @Override
   public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor();
+    return new 
RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
   }
 }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index bd28f79..1b783dc 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -31,13 +31,13 @@ import org.apache.pinot.core.common.MinionConstants;
 public class TaskExecutorFactoryRegistry {
   private final Map<String, PinotTaskExecutorFactory> 
_taskExecutorFactoryRegistry = new HashMap<>();
 
-  public TaskExecutorFactoryRegistry() {
+  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager 
minionTaskZkMetadataManager) {
     
registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
         new ConvertToRawIndexTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new 
PurgeTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new 
MergeRollupTaskExecutorFactory());
     
registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        new RealtimeToOfflineSegmentsTaskExecutorFactory());
+        new 
RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
   }
 
   /**
diff --git 
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
 
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 601c5e4..341f543 100644
--- 
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ 
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -96,15 +96,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_PARTITIONING).setTimeColumnName(T)
             .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap)).build();
     TableConfig tableConfigWithSortedCol =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T).setSortedColumn(D1)
-            .build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T)
+            .setSortedColumn(D1).build();
     TableConfig tableConfigEpochHours =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX).setSortedColumn(D1)
-            .setIngestionConfig(new IngestionConfig(null, 
Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)"))))
-            .build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
+            .setSortedColumn(D1).setIngestionConfig(
+            new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig(T_TRX, "toEpochHours(t)")))).build();
     TableConfig tableConfigSDF =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX).setSortedColumn(D1)
-            .setIngestionConfig(new IngestionConfig(null, 
Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
+            .setSortedColumn(D1).setIngestionConfig(
+            new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
             .build();
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
@@ -112,12 +113,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
             .addDateTime(T, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:MILLISECONDS").build();
     Schema schemaEpochHours =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
-            .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX, 
FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS")
-            .build();
+            .addMetric(M1, FieldSpec.DataType.INT)
+            .addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:EPOCH", 
"1:HOURS").build();
     Schema schemaSDF =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
-            .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX, 
FieldSpec.DataType.INT, "1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS")
-            .build();
+            .addMetric(M1, FieldSpec.DataType.INT)
+            .addDateTime(T_TRX, FieldSpec.DataType.INT, 
"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS").build();
 
     List<String> d1 = Lists.newArrayList("foo", "bar", "foo", "foo", "bar");
     List<List<GenericRow>> rows = new ArrayList<>(NUM_SEGMENTS);
@@ -213,12 +214,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -239,13 +241,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -266,14 +269,15 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
 "round(t, 86400000)");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -294,15 +298,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
 "round(t, 86400000)");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
     configs.put(M1 + 
MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, 
"max");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -326,12 +331,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_PARTITIONING);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600468000000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600617600000");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600468000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600617600000");
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -357,13 +363,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_SORTED_COL);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirList, WORKING_DIR);
@@ -384,13 +391,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_EPOCH_HOURS);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirListEpochHours, WORKING_DIR);
@@ -412,13 +420,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_SDF);
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
 "1600473600000");
-    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
 "1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
     
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, 
"rollup");
-    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirListSDF, WORKING_DIR);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to