This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 381097dc1b Added support to perform task validations for plug-in tasks 
(#14340)
381097dc1b is described below

commit 381097dc1ba990839f43fe40fc9144fb9c879769
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Thu Nov 7 22:08:16 2024 -0800

    Added support to perform task validations for plug-in tasks (#14340)
    
    * Added support to perform task validations for plug-in tasks
    
    * trigger build2
---
 .../api/resources/PinotTableRestletResource.java   |   8 +
 .../api/resources/TableConfigsRestletResource.java |   7 +
 .../core/minion/generator/PinotTaskGenerator.java  |   8 +
 .../pinot/controller/util/TaskConfigUtils.java     |  95 ++++++++++
 .../api/PinotTableRestletResourceTest.java         |  42 ++++-
 .../core/minion/PinotTaskManagerStatelessTest.java |  50 ++++-
 .../pinot/controller/util/TaskConfigUtilsTest.java | 122 +++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  24 +++
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |  46 +++++
 .../UpsertCompactionTaskGenerator.java             |  47 +++++
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 137 ++++++++++++++
 .../UpsertCompactionTaskGeneratorTest.java         |  58 ++++++
 .../segment/local/utils/TableConfigUtils.java      | 122 -------------
 .../segment/local/utils/TableConfigUtilsTest.java  | 203 ---------------------
 14 files changed, 635 insertions(+), 334 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 0320e149d7..040c57c885 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -94,6 +94,7 @@ import 
org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
@@ -105,6 +106,7 @@ import 
org.apache.pinot.controller.tuner.TableConfigTunerUtils;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.controller.util.TableIngestionStatusHelper;
 import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.controller.util.TaskConfigUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.ManualAuthorization;
@@ -170,6 +172,9 @@ public class PinotTableRestletResource {
   @Inject
   PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
 
+  @Inject
+  PinotTaskManager _pinotTaskManager;
+
   @Inject
   ControllerConf _controllerConf;
 
@@ -234,6 +239,7 @@ public class PinotTableRestletResource {
         TableConfigUtils.ensureMinReplicas(tableConfig, 
_controllerConf.getDefaultTableMinReplicas());
         TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, 
_controllerConf.getDimTableMaxSize());
         
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), 
tableConfig);
+        TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, 
typesToSkip);
       } catch (Exception e) {
         throw new InvalidTableConfigException(e);
       }
@@ -508,6 +514,7 @@ public class PinotTableRestletResource {
         TableConfigUtils.ensureMinReplicas(tableConfig, 
_controllerConf.getDefaultTableMinReplicas());
         TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, 
_controllerConf.getDimTableMaxSize());
         
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), 
tableConfig);
+        TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, 
typesToSkip);
       } catch (Exception e) {
         throw new InvalidTableConfigException(e);
       }
@@ -568,6 +575,7 @@ public class PinotTableRestletResource {
         throw new SchemaNotFoundException("Got empty schema");
       }
       TableConfigUtils.validate(tableConfig, schema, typesToSkip);
+      TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, 
typesToSkip);
       ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
       if (tableConfig.getTableType() == TableType.OFFLINE) {
         tableConfigValidateStr.set(TableType.OFFLINE.name(), 
tableConfig.toJsonNode());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index df543614b9..5d55df6095 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -61,7 +61,9 @@ import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
+import org.apache.pinot.controller.util.TaskConfigUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.ManualAuthorization;
@@ -102,6 +104,9 @@ public class TableConfigsRestletResource {
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  PinotTaskManager _pinotTaskManager;
+
   @Inject
   ControllerConf _controllerConf;
 
@@ -457,6 +462,7 @@ public class TableConfigsRestletResource {
             "Name in 'offline' table config: %s must be equal to 'tableName': 
%s", offlineRawTableName, rawTableName);
         TableConfigUtils.validateTableName(offlineTableConfig);
         TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip);
+        TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), 
_pinotTaskManager, typesToSkip);
       }
       if (realtimeTableConfig != null) {
         String realtimeRawTableName = DatabaseUtils.translateTableName(
@@ -465,6 +471,7 @@ public class TableConfigsRestletResource {
             "Name in 'realtime' table config: %s must be equal to 'tableName': 
%s", realtimeRawTableName, rawTableName);
         TableConfigUtils.validateTableName(realtimeTableConfig);
         TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip);
+        TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), 
_pinotTaskManager, typesToSkip);
       }
       if (offlineTableConfig != null && realtimeTableConfig != null) {
         TableConfigUtils.verifyHybridTableConfigs(rawTableName, 
offlineTableConfig, realtimeTableConfig);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index fd2a4615ed..9be76f253d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -99,4 +99,12 @@ public interface PinotTaskGenerator {
   default String getMinionInstanceTag(TableConfig tableConfig) {
     return CommonConstants.Helix.UNTAGGED_MINION_INSTANCE;
   }
+
+  /**
+   * Performs task type specific validations for the given task type.
+   * @param tableConfig The table configuration that is getting 
added/updated/validated.
+   * @param taskConfigs The task type specific task configuration to be 
validated.
+   */
+  default void validateTaskConfigs(TableConfig tableConfig, Map<String, 
String> taskConfigs) {
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
new file mode 100644
index 0000000000..059908ea8d
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.quartz.CronScheduleBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskConfigUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskConfigUtils.class);
+  private static final String SCHEDULE_KEY = "schedule";
+  private static final String ALL_VALIDATION_TYPE = "ALL";
+  private static final String TASK_VALIDATION_TYPE = "TASK";
+
+  private TaskConfigUtils() {
+  }
+
+  public static void validateTaskConfigs(TableConfig tableConfig, 
PinotTaskManager pinotTaskManager,
+      String validationTypesToSkip) {
+    if (tableConfig == null || tableConfig.getTaskConfig() == null) {
+      return;
+    }
+    if (ALL_VALIDATION_TYPE.equalsIgnoreCase(validationTypesToSkip) || 
TASK_VALIDATION_TYPE.equalsIgnoreCase(
+        validationTypesToSkip)) {
+      LOGGER.info("Skipping task validation as validationTypesToSkip is set 
to: {}", validationTypesToSkip);
+      return;
+    }
+    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+    for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+      String taskType = taskConfigEntry.getKey();
+      TaskGeneratorRegistry taskRegistry = 
pinotTaskManager.getTaskGeneratorRegistry();
+      if (taskRegistry != null) {
+        PinotTaskGenerator taskGenerator = 
taskRegistry.getTaskGenerator(taskType);
+        if (taskGenerator != null) {
+          Map<String, String> taskConfigs = taskConfigEntry.getValue();
+          doCommonTaskValidations(tableConfig, taskType, taskConfigs);
+          taskGenerator.validateTaskConfigs(tableConfig, taskConfigs);
+        } else {
+          throw new RuntimeException(String.format("Task generator not found 
for task type: %s, while validating table "
+              + "configs for table: %s", taskType, 
tableConfig.getTableName()));
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static void doCommonTaskValidations(TableConfig tableConfig, String taskType,
+      Map<String, String> taskConfigs) {
+    Preconditions.checkNotNull(taskConfigs, String.format("Task configuration 
for %s cannot be null.", taskType));
+    // Schedule key for task config has to be set.
+    if (taskConfigs.containsKey(SCHEDULE_KEY)) {
+      String cronExprStr = taskConfigs.get(SCHEDULE_KEY);
+      try {
+        CronScheduleBuilder.cronSchedule(cronExprStr);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Task %s contains an invalid cron schedule: %s", 
taskType, cronExprStr), e);
+      }
+    }
+    boolean isAllowDownloadFromServer = Boolean.parseBoolean(
+        
taskConfigs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER,
+            
String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER)));
+    if (isAllowDownloadFromServer) {
+      
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null,
+          String.format("Table %s has task %s with allowDownloadFromServer set 
to true, but "
+                  + "peerSegmentDownloadScheme is not set in the table 
config", tableConfig.getTableName(),
+              taskType));
+    }
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 54644c26aa..fdb8c09072 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -30,8 +30,10 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -64,6 +66,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
   public void setUp()
       throws Exception {
     DEFAULT_INSTANCE.setupSharedStateAndValidate();
+    registerMinionTasks();
     _createTableUrl = 
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate();
     
_offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
         .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
@@ -77,6 +80,43 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
         .setStreamConfigs(streamConfig.getStreamConfigsMap());
   }
 
+  private void registerMinionTasks() {
+    PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+      @Override
+      public String getTaskType() {
+        return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+      }
+
+      @Override
+      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
+        return List.of(new 
PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new 
HashMap<>()));
+      }
+    });
+    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+      @Override
+      public String getTaskType() {
+        return MinionConstants.MergeRollupTask.TASK_TYPE;
+      }
+
+      @Override
+      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
+        return List.of(new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>()));
+      }
+    });
+    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+      @Override
+      public String getTaskType() {
+        return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
+      }
+
+      @Override
+      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
+        return List.of(new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>()));
+      }
+    });
+  }
+
   @Test
   public void testCreateTable()
       throws Exception {
@@ -708,6 +748,6 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
 
   @AfterClass
   public void tearDown() {
-    DEFAULT_INSTANCE.cleanup();
+    DEFAULT_INSTANCE.stopSharedTestSetup();
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index e4405502ef..f224f4cd56 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.controller.helix.core.minion;
 
 import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,6 +30,8 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
@@ -103,7 +109,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
         new TableTaskConfig(
             ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 * * ? * * *")))).build();
     waitForEVToDisappear(tableConfig.getTableName());
-    addTableConfig(tableConfig);
+    addTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 1 && 
jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
         "JobGroupNames should have SegmentGenerationAndPushTask only");
@@ -139,7 +145,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
         new TableTaskConfig(
             ImmutableMap.of(segmentGenerationAndPushTask, 
ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
     waitForEVToDisappear(tableConfig.getTableName());
-    addTableConfig(tableConfig);
+    addTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(taskManager, jgn -> jgn.size() == 1 && 
jgn.contains(segmentGenerationAndPushTask),
         "JobGroupNames should have SegmentGenerationAndPushTask only");
     validateJob(segmentGenerationAndPushTask, "0 */10 * ? * * *");
@@ -233,7 +239,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
         new TableTaskConfig(
             ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
     waitForEVToDisappear(tableConfig.getTableName());
-    addTableConfig(tableConfig);
+    addTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 1 && 
jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
         "JobGroupNames should have SegmentGenerationAndPushTask only");
@@ -242,7 +248,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     // 2. Update table to new schedule
     tableConfig.setTaskConfig(new TableTaskConfig(
         ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 */20 * ? * * *"))));
-    updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 1 && 
jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
         "JobGroupNames should have SegmentGenerationAndPushTask only");
@@ -252,7 +258,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     tableConfig.setTaskConfig(new TableTaskConfig(
         ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 */30 * ? * * *"),
             "MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * 
*"))));
-    updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 2 && 
jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && 
jgn.contains(
             MinionConstants.MergeRollupTask.TASK_TYPE),
@@ -263,7 +269,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     // 4. Remove one task from the table
     tableConfig.setTaskConfig(
         new TableTaskConfig(ImmutableMap.of("MergeRollupTask", 
ImmutableMap.of("schedule", "0 */10 * ? * * *"))));
-    updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 1 && 
jgn.contains(MinionConstants.MergeRollupTask.TASK_TYPE),
         "JobGroupNames should have MergeRollupTask only");
@@ -300,7 +306,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
         new TableTaskConfig(
             ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
     waitForEVToDisappear(tableConfig.getTableName());
-    addTableConfig(tableConfig);
+    addTableConfig(tableConfig, "TASK");
     waitForJobGroupNames(_controllerStarter.getTaskManager(),
         jgn -> jgn.size() == 1 && 
jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
         "JobGroupNames should have SegmentGenerationAndPushTask only");
@@ -321,7 +327,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     tableConfig.setTaskConfig(new TableTaskConfig(
         ImmutableMap.of("SegmentGenerationAndPushTask", 
ImmutableMap.of("schedule", "0 */10 * ? * * *"),
             "MergeRollupTask", ImmutableMap.of("schedule", "0 */20 * ? * * 
*"))));
-    updateTableConfig(tableConfig);
+    updateTableConfig(tableConfig, "TASK");
 
     // Task is put into table config.
     TableConfig tableConfigAfterRestart =
@@ -407,6 +413,34 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     }, TIMEOUT_IN_MS, 500L, "Cron expression didn't change to " + 
cronExpression);
   }
 
+  private void addTableConfig(TableConfig tableConfig, String 
validationTypesToSkip)
+      throws IOException {
+    String createTableUriStr =
+        String.format(_controllerRequestURLBuilder.forTableCreate() + 
"?validationTypesToSkip=%s",
+            validationTypesToSkip);
+    try {
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendJsonPostRequest(new URI(createTableUriStr), 
tableConfig.toJsonString(),
+              Collections.emptyMap()));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void updateTableConfig(TableConfig tableConfig, String 
validationTypesToSkip)
+      throws IOException {
+    String updateTableUriStr = String.format(
+        
_controllerRequestURLBuilder.forUpdateTableConfig(tableConfig.getTableName()) + 
"?validationTypesToSkip=%s",
+        validationTypesToSkip);
+    try {
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendJsonPutRequest(new URI(updateTableUriStr), 
tableConfig.toJsonString(),
+              Collections.emptyMap()));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     stopZk();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
new file mode 100644
index 0000000000..000bf9826c
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+
+public class TaskConfigUtilsTest {
+  PinotTaskManager _mockTaskManager;
+  TaskGeneratorRegistry _mockTaskRegistry;
+  PinotTaskGenerator _taskGenerator;
+  private static final String TEST_TASK_TYPE = "myTask";
+  private static final String TEST_TABLE_NAME = "myTable";
+
+  @BeforeMethod
+  public void setup() {
+    _mockTaskManager = Mockito.mock(PinotTaskManager.class);
+    _mockTaskRegistry = Mockito.mock(TaskGeneratorRegistry.class);
+
+    _taskGenerator = new BaseTaskGenerator() {
+
+      @Override
+      public String getTaskType() {
+        return TEST_TASK_TYPE;
+      }
+
+      @Override
+      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
+        return List.of(new PinotTaskConfig(TEST_TASK_TYPE, new HashMap<>()));
+      }
+
+      @Override
+      public void validateTaskConfigs(TableConfig tableConfig, Map<String, 
String> taskConfigs) {
+        throw new RuntimeException("TableConfig validation failed");
+      }
+    };
+
+    
when(_mockTaskRegistry.getTaskGenerator(TEST_TASK_TYPE)).thenReturn(_taskGenerator);
+    
when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry);
+  }
+
+  @Test (expectedExceptions = RuntimeException.class)
+  public void testValidateTableTaskConfigsValidationException() {
+    TableTaskConfig tableTaskConfig =
+        new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, 
ImmutableMap.of("schedule", "0 */10 * ? * * *")));
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
+    TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
+  }
+
+  @Test (expectedExceptions = RuntimeException.class)
+  public void testValidateTableTaskConfigsUnknownTaskType() {
+    TableTaskConfig tableTaskConfig =
+        new TableTaskConfig(ImmutableMap.of("otherTask", 
ImmutableMap.of("schedule", "0 */10 * ? * * *")));
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
+    TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
+  }
+
+  @Test
+  public void testCommonTaskValidations() {
+    // invalid schedule
+    HashMap<String, String> invalidScheduleConfig = new HashMap<>();
+    invalidScheduleConfig.put("schedule", "invalidSchedule");
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, 
invalidScheduleConfig))).build();
+
+    try {
+      TaskConfigUtils.doCommonTaskValidations(tableConfig, TEST_TASK_TYPE, 
invalidScheduleConfig);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("contains an invalid cron 
schedule"));
+    }
+
+    // invalid allowDownloadFromServer config
+    HashMap<String, String> invalidAllowDownloadFromServerConfig = new 
HashMap<>();
+    invalidAllowDownloadFromServerConfig.put("allowDownloadFromServer", 
"true");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, 
invalidAllowDownloadFromServerConfig))).build();
+    try {
+      TaskConfigUtils.doCommonTaskValidations(tableConfig, TEST_TASK_TYPE, 
invalidAllowDownloadFromServerConfig);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "allowDownloadFromServer set to true, but " + 
"peerSegmentDownloadScheme is not set in the table config"));
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 9b1b89b4b7..6ad497a4a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pinot.core.common;
 
+import java.util.EnumSet;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+
+
 public class MinionConstants {
   private MinionConstants() {
   }
@@ -138,6 +144,19 @@ public class MinionConstants {
 
     @Deprecated // Replaced by MERGE_TYPE_KEY
     public static final String COLLECTOR_TYPE_KEY = "collectorType";
+
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    public static final String ROUND_BUCKET_TIME_PERIOD_KEY = 
"roundBucketTimePeriod";
+    public static final String MERGE_TYPE_KEY = "mergeType";
+    public static final String AGGREGATION_TYPE_KEY_SUFFIX = 
".aggregationType";
+
+    public final static EnumSet<AggregationFunctionType> 
AVAILABLE_CORE_VALUE_AGGREGATORS =
+        EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
+            DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, 
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
+            SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, 
DISTINCTCOUNTHLLPLUS,
+            DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, 
DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL,
+            DISTINCTCOUNTRAWULL);
   }
 
   // Generate segment and push to controller based on batch ingestion configs
@@ -170,6 +189,11 @@ public class MinionConstants {
      */
     public static final String VALID_DOC_IDS_TYPE = "validDocIdsType";
 
+    /**
+     * Value for the key VALID_DOC_IDS_TYPE
+     */
+    public static final String SNAPSHOT = "snapshot";
+
     /**
      * number of segments to query in one batch to fetch valid doc id 
metadata, by default 500
      */
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 0323e26747..73ff19ebef 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,11 +38,15 @@ import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtil
 import org.apache.pinot.core.common.MinionConstants;
 import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
 import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Segment;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
@@ -314,4 +319,45 @@ public class RealtimeToOfflineSegmentsTaskGenerator 
extends BaseTaskGenerator {
     }
     return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
   }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is not upsert
+    Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
+        "RealtimeToOfflineTask doesn't support upsert table!");
+    // check no malformed period
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, 
"2d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, 
"1d"));
+    TimeUtils.convertPeriodToMillis(
+        
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1s"));
+    // check mergeType is correct
+    Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), 
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+        
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
 MergeType.CONCAT.name())
+            .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, 
DEDUP]!");
+
+    Schema schema = 
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+    // check no mis-configured columns
+    Set<String> columnNames = schema.getColumnNames();
+    for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+      if (entry.getKey().endsWith(".aggregationType")) {
+        Preconditions.checkState(columnNames.contains(
+                StringUtils.removeEnd(entry.getKey(), 
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+            String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+        try {
+          // check that it's a valid aggregation function type
+          AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+          // check that a value aggregator is available
+          if 
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
 {
+            throw new IllegalArgumentException("ValueAggregator not enabled 
for type: " + aft.toString());
+          }
+        } catch (IllegalArgumentException e) {
+          String err =
+              String.format("Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+          throw new IllegalStateException(err);
+        }
+      }
+    }
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 9f64e30cf1..ae74bc8ca9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -304,4 +304,51 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
     }
     return true;
   }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is realtime
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "UpsertCompactionTask only supports realtime tables!");
+    // check upsert enabled
+    Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be 
enabled for UpsertCompactionTask");
+
+    // check no malformed period
+    if (taskConfigs.containsKey(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY)) {
+      
TimeUtils.convertPeriodToMillis(taskConfigs.get(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY));
+    }
+    // check invalidRecordsThresholdPercent
+    if 
(taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT))
 {
+      Preconditions.checkState(
+          
Double.parseDouble(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT))
 >= 0
+              && 
Double.parseDouble(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT))
 <= 100,
+          "invalidRecordsThresholdPercent must be >= 0 and <= 100");
+    }
+    // check invalidRecordsThresholdCount
+    if 
(taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT)) 
{
+      Preconditions.checkState(
+          
Long.parseLong(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT))
 >= 1,
+          "invalidRecordsThresholdCount must be >= 1");
+    }
+    // check that either invalidRecordsThresholdPercent or 
invalidRecordsThresholdCount was provided
+    Preconditions.checkState(
+        
taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT) 
|| taskConfigs.containsKey(
+            UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT),
+        "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or 
both must be provided");
+    String validDocIdsType =
+        taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, 
UpsertCompactionTask.SNAPSHOT);
+    if (validDocIdsType.equals(ValidDocIdsType.SNAPSHOT.toString())) {
+      UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+      Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided 
for UpsertCompactionTask");
+      Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format(
+          "'enableSnapshot' from UpsertConfig must be enabled for 
UpsertCompactionTask with validDocIdsType = "
+              + "%s", validDocIdsType));
+    } else if 
(validDocIdsType.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) {
+      UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+      Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided 
for UpsertCompactionTask");
+      Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(), 
String.format(
+          "deleteRecordColumn must be provided for " + "UpsertCompactionTask 
with validDocIdsType = %s",
+          validDocIdsType));
+    }
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 84169f3b93..49a9fd8d57 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +29,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import 
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
@@ -35,9 +37,13 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -508,6 +514,137 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     assertTrue(pinotTaskConfigs.isEmpty());
   }
 
+  @Test
+  public void testRealtimeToOfflineSegmentsTaskConfig() {
+    ClusterInfoAccessor mockClusterInfoAccessor = 
mock(ClusterInfoAccessor.class);
+    PinotHelixResourceManager mockPinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(mockClusterInfoAccessor.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+
+    
when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema);
+
+    RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new 
RealtimeToOfflineSegmentsTaskGenerator();
+    taskGenerator.init(mockClusterInfoAccessor);
+
+    Map<String, String> realtimeToOfflineTaskConfig =
+        ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", 
"6h", "bufferTimePeriod", "5d", "mergeType",
+            "rollup", "myCol.aggregationType", "max");
+
+    Map<String, String> segmentGenerationAndPushTaskConfig = 
ImmutableMap.of("schedule", "0 */10 * ? * * *");
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
+            "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
+
+    // validate valid config
+    taskGenerator.validateTaskConfigs(tableConfig, 
realtimeToOfflineTaskConfig);
+
+    // invalid Upsert config with RealtimeToOfflineTask
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig(
+                ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
+                    "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, 
realtimeToOfflineTaskConfig);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't 
support upsert table"));
+    }
+
+    // invalid period
+    HashMap<String, String> invalidPeriodConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidPeriodConfig.put("roundBucketTimePeriod", "garbage");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
invalidPeriodConfig, "SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("Invalid time spec"));
+    }
+
+    // invalid mergeType
+    HashMap<String, String> invalidMergeType = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidMergeType.put("mergeType", "garbage");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, 
"SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("MergeType must be one of"));
+    }
+
+    // invalid column
+    HashMap<String, String> invalidColumnConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidColumnConfig.put("score.aggregationType", "max");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
invalidColumnConfig, "SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("not found in schema"));
+    }
+
+    // invalid agg
+    HashMap<String, String> invalidAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidAggConfig.put("myCol.aggregationType", "garbage");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, 
"SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
+    }
+
+    // aggregation function that exists but has no ValueAggregator available
+    HashMap<String, String> invalidAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidAgg2Config.put("myCol.aggregationType", "Histogram");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
invalidAgg2Config, "SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    try {
+      taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
+    }
+
+    // valid agg
+    HashMap<String, String> validAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, 
"SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    taskGenerator.validateTaskConfigs(tableConfig, validAggConfig);
+
+    // valid agg
+    HashMap<String, String> validAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, 
"SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
+    taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config);
+  }
+
   private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status 
status, long startTime, long endTime,
       TimeUnit timeUnit, String downloadURL) {
     SegmentZKMetadata realtimeSegmentZKMetadata = new 
SegmentZKMetadata(segmentName);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index ec8d8ea786..604c1aa476 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -330,6 +331,63 @@ public class UpsertCompactionTaskGeneratorTest {
         _completedSegment2.getSegmentName());
   }
 
+  @Test
+  public void testUpsertCompactionTaskConfig() {
+    Map<String, String> upsertCompactionTaskConfig =
+        ImmutableMap.of("bufferTimePeriod", "5d", 
"invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount",
+            "1");
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableSnapshot(true);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(upsertConfig)
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
+        .build();
+
+    _taskGenerator.validateTaskConfigs(tableConfig, 
upsertCompactionTaskConfig);
+
+    // test with invalidRecordsThresholdPercents as 0
+    Map<String, String> upsertCompactionTaskConfig1 = 
ImmutableMap.of("invalidRecordsThresholdPercent", "0");
+    TableConfig zeroPercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(upsertConfig)
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig1)))
+        .build();
+    _taskGenerator.validateTaskConfigs(zeroPercentTableConfig, 
upsertCompactionTaskConfig1);
+
+    // test with invalid invalidRecordsThresholdPercents as -1 and 110
+    Map<String, String> upsertCompactionTaskConfig2 = 
ImmutableMap.of("invalidRecordsThresholdPercent", "-1");
+    TableConfig negativePercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(upsertConfig)
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig2)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig, 
upsertCompactionTaskConfig2));
+    Map<String, String> upsertCompactionTaskConfig3 = 
ImmutableMap.of("invalidRecordsThresholdPercent", "110");
+    TableConfig hundredTenPercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig3)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig, 
upsertCompactionTaskConfig3));
+
+    // test with invalid invalidRecordsThresholdCount
+    Map<String, String> upsertCompactionTaskConfig4 = 
ImmutableMap.of("invalidRecordsThresholdCount", "0");
+    TableConfig invalidCountTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig4)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig, 
upsertCompactionTaskConfig4));
+
+    // test without invalidRecordsThresholdPercent or 
invalidRecordsThresholdCount
+    Map<String, String> upsertCompactionTaskConfig5 = 
ImmutableMap.of("bufferTimePeriod", "5d");
+    TableConfig invalidTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig5)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(invalidTableConfig, 
upsertCompactionTaskConfig5));
+  }
+
   private Map<String, String> getCompactionConfigs(String 
invalidRecordsThresholdPercent,
       String invalidRecordsThresholdCount) {
     Map<String, String> compactionConfigs = new HashMap<>();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 959fdfc51d..722fb3e4b9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
-import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
@@ -94,7 +93,6 @@ import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.quartz.CronScheduleBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,11 +108,9 @@ public final class TableConfigUtils {
   }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableConfigUtils.class);
-  private static final String SCHEDULE_KEY = "schedule";
   private static final String STAR_TREE_CONFIG_NAME = "StarTreeIndex Config";
 
   // supported TableTaskTypes, must be identical to the one return in the impl 
of {@link PinotTaskGenerator}.
-  private static final String REALTIME_TO_OFFLINE_TASK_TYPE = 
"RealtimeToOfflineSegmentsTask";
   private static final String UPSERT_COMPACTION_TASK_TYPE = 
"UpsertCompactionTask";
 
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
@@ -192,9 +188,6 @@ public final class TableConfigUtils {
         validateUpsertAndDedupConfig(tableConfig, schema);
         validatePartialUpsertStrategies(tableConfig, schema);
       }
-      if (!skipTypes.contains(ValidationType.TASK)) {
-        validateTaskConfigs(tableConfig, schema);
-      }
 
       if (_enforcePoolBasedAssignment) {
         validateInstancePoolsNReplicaGroups(tableConfig);
@@ -665,121 +658,6 @@ public final class TableConfigUtils {
     }
   }
 
-  public final static EnumSet<AggregationFunctionType> 
AVAILABLE_CORE_VALUE_AGGREGATORS =
-      EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
-          DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, 
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
-          SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, 
DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS,
-          DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, 
DISTINCTCOUNTRAWULL);
-
-  @VisibleForTesting
-  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
-    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null) {
-      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
-        String taskTypeConfigName = taskConfigEntry.getKey();
-        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
-        // Task configuration cannot be null.
-        Preconditions.checkNotNull(taskTypeConfig,
-            String.format("Task configuration for \"%s\" cannot be null!", 
taskTypeConfigName));
-        // Schedule key for task config has to be set.
-        if (taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-          try {
-            CronScheduleBuilder.cronSchedule(cronExprStr);
-          } catch (Exception e) {
-            throw new IllegalStateException(
-                String.format("Task %s contains an invalid cron schedule: %s", 
taskTypeConfigName, cronExprStr), e);
-          }
-        }
-        boolean isAllowDownloadFromServer = Boolean.parseBoolean(
-            
taskTypeConfig.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER,
-                
String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER)));
-        if (isAllowDownloadFromServer) {
-          
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null,
-              String.format("Table %s has task %s with allowDownloadFromServer 
set to true, but "
-                      + "peerSegmentDownloadScheme is not set in the table 
config", tableConfig.getTableName(),
-                  taskTypeConfigName));
-        }
-        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
-        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
-        if (taskTypeConfigName.equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
-          // check table is not upsert
-          Preconditions.checkState(tableConfig.getUpsertMode() == 
UpsertConfig.Mode.NONE,
-              "RealtimeToOfflineTask doesn't support upsert table!");
-          // check no malformed period
-          
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", 
"2d"));
-          
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", 
"1d"));
-          
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod",
 "1s"));
-          // check mergeType is correct
-          Preconditions.checkState(ImmutableSet.of("CONCAT", "ROLLUP", "DEDUP")
-                  .contains(taskTypeConfig.getOrDefault("mergeType", 
"CONCAT").toUpperCase()),
-              "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");
-          // check no mis-configured columns
-          Set<String> columnNames = schema.getColumnNames();
-          for (Map.Entry<String, String> entry : taskTypeConfig.entrySet()) {
-            if (entry.getKey().endsWith(".aggregationType")) {
-              
Preconditions.checkState(columnNames.contains(StringUtils.removeEnd(entry.getKey(),
 ".aggregationType")),
-                  String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
-              try {
-                // check that it's a valid aggregation function type
-                AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
-                // check that a value aggregator is available
-                if (!AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
-                  throw new IllegalArgumentException("ValueAggregator not 
enabled for type: " + aft.toString());
-                }
-              } catch (IllegalArgumentException e) {
-                String err =
-                    String.format("Column \"%s\" has invalid aggregate type: 
%s", entry.getKey(), entry.getValue());
-                throw new IllegalStateException(err);
-              }
-            }
-          }
-        } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) {
-          // check table is realtime
-          Preconditions.checkState(tableConfig.getTableType() == 
TableType.REALTIME,
-              "UpsertCompactionTask only supports realtime tables!");
-          // check upsert enabled
-          Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must 
be enabled for UpsertCompactionTask");
-
-          // check no malformed period
-          if (taskTypeConfig.containsKey("bufferTimePeriod")) {
-            
TimeUtils.convertPeriodToMillis(taskTypeConfig.get("bufferTimePeriod"));
-          }
-          // check invalidRecordsThresholdPercent
-          if (taskTypeConfig.containsKey("invalidRecordsThresholdPercent")) {
-            
Preconditions.checkState(Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent"))
 >= 0
-                    && 
Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) <= 100,
-                "invalidRecordsThresholdPercent must be >= 0 and <= 100");
-          }
-          // check invalidRecordsThresholdCount
-          if (taskTypeConfig.containsKey("invalidRecordsThresholdCount")) {
-            
Preconditions.checkState(Long.parseLong(taskTypeConfig.get("invalidRecordsThresholdCount"))
 >= 1,
-                "invalidRecordsThresholdCount must be >= 1");
-          }
-          // check that either invalidRecordsThresholdPercent or 
invalidRecordsThresholdCount was provided
-          Preconditions.checkState(
-              taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || 
taskTypeConfig.containsKey(
-                  "invalidRecordsThresholdCount"),
-              "invalidRecordsThresholdPercent or invalidRecordsThresholdCount 
or both must be provided");
-          String validDocIdsType = 
taskTypeConfig.getOrDefault("validDocIdsType", "snapshot");
-          if (validDocIdsType.equals(ValidDocIdsType.SNAPSHOT.toString())) {
-            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-            Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be 
provided for UpsertCompactionTask");
-            Preconditions.checkState(upsertConfig.isEnableSnapshot(), 
String.format(
-                "'enableSnapshot' from UpsertConfig must be enabled for 
UpsertCompactionTask with validDocIdsType = "
-                    + "%s", validDocIdsType));
-          } else if 
(validDocIdsType.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) {
-            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-            Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be 
provided for UpsertCompactionTask");
-            Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(), 
String.format(
-                "deleteRecordColumn must be provided for " + 
"UpsertCompactionTask with validDocIdsType = %s",
-                validDocIdsType));
-          }
-        }
-      }
-    }
-  }
-
   /**
    * Validates the upsert-related configurations
    *  - check table type is realtime
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 72992e99cf..98b0ba552c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -44,7 +44,6 @@ import 
org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
@@ -2273,147 +2272,6 @@ public class TableConfigUtilsTest {
     });
   }
 
-  @Test
-  public void testTaskConfig() {
-    Schema schema =
-        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
-            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
-    Map<String, String> realtimeToOfflineTaskConfig =
-        ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", 
"6h", "bufferTimePeriod", "5d", "mergeType",
-            "rollup", "myCol.aggregationType", "max");
-    Map<String, String> segmentGenerationAndPushTaskConfig = 
ImmutableMap.of("schedule", "0 */10 * ? * * *");
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(
-        new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
-            "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
-
-    // validate valid config
-    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-
-    // invalid schedule
-    HashMap<String, String> invalidScheduleConfig = new 
HashMap<>(segmentGenerationAndPushTaskConfig);
-    invalidScheduleConfig.put("schedule", "garbage");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
-            invalidScheduleConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("contains an invalid cron 
schedule"));
-    }
-
-    // invalid allowDownloadFromServer config
-    HashMap<String, String> invalidAllowDownloadFromServerConfig = new 
HashMap<>(segmentGenerationAndPushTaskConfig);
-    invalidAllowDownloadFromServerConfig.put("allowDownloadFromServer", 
"true");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
-            invalidAllowDownloadFromServerConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("allowDownloadFromServer set 
to true, but "
-          + "peerSegmentDownloadScheme is not set in the table config"));
-    }
-
-    // invalid Upsert config with RealtimeToOfflineTask
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs()).setTaskConfig(
-            new 
TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
-                "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't 
support upsert table"));
-    }
-
-    // validate that TASK config will be skipped with skip string.
-    TableConfigUtils.validate(tableConfig, schema, "TASK,UPSERT");
-
-    // invalid period
-    HashMap<String, String> invalidPeriodConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    invalidPeriodConfig.put("roundBucketTimePeriod", "garbage");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().contains("Invalid time spec"));
-    }
-
-    // invalid mergeType
-    HashMap<String, String> invalidMergeType = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    invalidMergeType.put("mergeType", "garbage");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("MergeType must be one of"));
-    }
-
-    // invalid column
-    HashMap<String, String> invalidColumnConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    invalidColumnConfig.put("score.aggregationType", "max");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("not found in schema"));
-    }
-
-    // invalid agg
-    HashMap<String, String> invalidAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    invalidAggConfig.put("myCol.aggregationType", "garbage");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
-    }
-
-    // aggregation function that exists but has no ValueAggregator available
-    HashMap<String, String> invalidAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    invalidAgg2Config.put("myCol.aggregationType", "Histogram");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    try {
-      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
-    }
-
-    // valid agg
-    HashMap<String, String> validAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-
-    // valid agg
-    HashMap<String, String> validAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus");
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
-        ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, 
"SegmentGenerationAndPushTask",
-            segmentGenerationAndPushTaskConfig))).build();
-    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-  }
-
   @Test
   public void testValidateInstancePartitionsMap() {
     InstanceAssignmentConfig instanceAssignmentConfig = 
Mockito.mock(InstanceAssignmentConfig.class);
@@ -2548,67 +2406,6 @@ public class TableConfigUtilsTest {
     }
   }
 
-  @Test
-  public void testUpsertCompactionTaskConfig() {
-    Schema schema =
-        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
-            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
-    Map<String, String> upsertCompactionTaskConfig =
-        ImmutableMap.of("bufferTimePeriod", "5d", 
"invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount",
-            "1");
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    upsertConfig.setEnableSnapshot(true);
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(upsertConfig)
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-
-    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
-
-    // test with invalidRecordsThresholdPercents as 0
-    upsertCompactionTaskConfig = 
ImmutableMap.of("invalidRecordsThresholdPercent", "0");
-    TableConfig zeroPercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(upsertConfig)
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-    TableConfigUtils.validateTaskConfigs(zeroPercentTableConfig, schema);
-
-    // test with invalid invalidRecordsThresholdPercents as -1 and 110
-    upsertCompactionTaskConfig = 
ImmutableMap.of("invalidRecordsThresholdPercent", "-1");
-    TableConfig negativePercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(upsertConfig)
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-    Assert.assertThrows(IllegalStateException.class,
-        () -> TableConfigUtils.validateTaskConfigs(negativePercentTableConfig, 
schema));
-    upsertCompactionTaskConfig = 
ImmutableMap.of("invalidRecordsThresholdPercent", "110");
-    TableConfig hundredTenPercentTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-    Assert.assertThrows(IllegalStateException.class,
-        () -> 
TableConfigUtils.validateTaskConfigs(hundredTenPercentTableConfig, schema));
-
-    // test with invalid invalidRecordsThresholdCount
-    upsertCompactionTaskConfig = 
ImmutableMap.of("invalidRecordsThresholdCount", "0");
-    TableConfig invalidCountTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-    Assert.assertThrows(IllegalStateException.class,
-        () -> TableConfigUtils.validateTaskConfigs(invalidCountTableConfig, 
schema));
-
-    // test without invalidRecordsThresholdPercent or 
invalidRecordsThresholdCount
-    upsertCompactionTaskConfig = ImmutableMap.of("bufferTimePeriod", "5d");
-    TableConfig invalidTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", 
upsertCompactionTaskConfig)))
-        .build();
-    Assert.assertThrows(IllegalStateException.class,
-        () -> TableConfigUtils.validateTaskConfigs(invalidTableConfig, 
schema));
-  }
-
   @Test
   public void testValidatePartitionedReplicaGroupInstance() {
     String partitionColumn = "testPartitionCol";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to