Jackie-Jiang commented on a change in pull request #7523:
URL: https://github.com/apache/pinot/pull/7523#discussion_r723759353



##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {

Review comment:
       ```suggestion
           if (taskTypeConfig.containsKey(SCHEDULE_KEY)) {
   ```

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+          try {
+            CronScheduleBuilder.cronSchedule(cronExprStr);
+          } catch (Exception e) {
+            throw new IllegalStateException(String.format(
+                "Task %s contains an invalid cron schedule: %s", 
taskConfigEntry.getKey(), cronExprStr), e);
+          }
+        }
+        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
+        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
+        if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
+          if (taskTypeConfig != null) {

Review comment:
       Remove

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+          try {
+            CronScheduleBuilder.cronSchedule(cronExprStr);
+          } catch (Exception e) {
+            throw new IllegalStateException(String.format(
+                "Task %s contains an invalid cron schedule: %s", 
taskConfigEntry.getKey(), cronExprStr), e);
+          }
+        }
+        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
+        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
+        if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
+          if (taskTypeConfig != null) {
+            // check table is not upsert
+            Preconditions.checkState(tableConfig.getUpsertConfig() == null
+                || 
tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE),
+                "TableConfig cannot have upsert config when using 
RealtimeToOfflineTask!");
+            // check no malformed period
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", 
"2d"));
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", 
"1d"));
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod",
 "1s"));
+            // check mergeType is correct
+            Preconditions.checkState(ImmutableSet.of("concat", "rollup", 
"dedup").contains(
+                taskTypeConfig.getOrDefault("mergeType", "concat")),
+                "MergeType must be one of [concat, rollup, dedup]!");

Review comment:
       (Major) The check should be case-insensitive. We usually prefer all 
capital for enum
   ```suggestion
               Preconditions.checkState(ImmutableSet.of("CONCAT", "ROLLUP", 
"DEDUP").contains(
                   taskTypeConfig.getOrDefault("mergeType", 
"CONCAT").toUpperCase()),
                   "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");
   ```

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {

Review comment:
       Annotate with `@VisibleForTesting`

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+          try {
+            CronScheduleBuilder.cronSchedule(cronExprStr);
+          } catch (Exception e) {
+            throw new IllegalStateException(String.format(
+                "Task %s contains an invalid cron schedule: %s", 
taskConfigEntry.getKey(), cronExprStr), e);
+          }
+        }
+        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
+        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
+        if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
+          if (taskTypeConfig != null) {
+            // check table is not upsert
+            Preconditions.checkState(tableConfig.getUpsertConfig() == null
+                || 
tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE),
+                "TableConfig cannot have upsert config when using 
RealtimeToOfflineTask!");
+            // check no malformed period
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", 
"2d"));

Review comment:
       Suggest wrapping this into a helper method, and only check when the 
config exists. No need to validate the default value

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+          try {
+            CronScheduleBuilder.cronSchedule(cronExprStr);
+          } catch (Exception e) {
+            throw new IllegalStateException(String.format(
+                "Task %s contains an invalid cron schedule: %s", 
taskConfigEntry.getKey(), cronExprStr), e);
+          }
+        }
+        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
+        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
+        if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
+          if (taskTypeConfig != null) {
+            // check table is not upsert
+            Preconditions.checkState(tableConfig.getUpsertConfig() == null
+                || 
tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE),
+                "TableConfig cannot have upsert config when using 
RealtimeToOfflineTask!");
+            // check no malformed period
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", 
"2d"));
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", 
"1d"));
+            
TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod",
 "1s"));
+            // check mergeType is correct
+            Preconditions.checkState(ImmutableSet.of("concat", "rollup", 
"dedup").contains(
+                taskTypeConfig.getOrDefault("mergeType", "concat")),
+                "MergeType must be one of [concat, rollup, dedup]!");
+            // check no mis-configured columns
+            Set<String> columnNames = schema.getColumnNames();
+            for (Map.Entry<String, String> entry : taskTypeConfig.entrySet()) {
+              if (entry.getKey().endsWith(".aggregationType")) {
+                Preconditions.checkState(
+                    columnNames.contains(StringUtils.removeEnd(entry.getKey(), 
".aggregationType")),
+                    String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
+                Preconditions.checkState(ImmutableSet.of("sum", "max", 
"min").contains(entry.getValue()),

Review comment:
       Same here, case-insensitive check

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
-  private static void validateTaskConfigs(TableConfig tableConfig) {
+  static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-    if (taskConfig != null && 
taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) {
-      Map<String, String> taskTypeConfig = 
taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE);
-      if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) {
-        String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
-        try {
-          CronScheduleBuilder.cronSchedule(cronExprStr);
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              String.format("SegmentGenerationAndPushTask contains an invalid 
cron schedule: %s", cronExprStr), e);
+    if (taskConfig != null) {
+      for (Map.Entry<String, Map<String, String>> taskConfigEntry : 
taskConfig.getTaskTypeConfigsMap().entrySet()) {
+        Map<String, String> taskTypeConfig = taskConfigEntry.getValue();
+        if (taskTypeConfig != null && 
taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+          String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+          try {
+            CronScheduleBuilder.cronSchedule(cronExprStr);
+          } catch (Exception e) {
+            throw new IllegalStateException(String.format(
+                "Task %s contains an invalid cron schedule: %s", 
taskConfigEntry.getKey(), cronExprStr), e);
+          }
+        }
+        // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE
+        // TODO task specific validate logic should directly call to 
PinotTaskGenerator API
+        if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) {
+          if (taskTypeConfig != null) {
+            // check table is not upsert
+            Preconditions.checkState(tableConfig.getUpsertConfig() == null

Review comment:
       This is equivalent to the check on line 351. We can remove the check here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to