Jackie-Jiang commented on a change in pull request #7523: URL: https://github.com/apache/pinot/pull/7523#discussion_r723759353
########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { Review comment: ```suggestion if (taskTypeConfig.containsKey(SCHEDULE_KEY)) { ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException(String.format( + "Task %s contains an invalid cron schedule: %s", taskConfigEntry.getKey(), cronExprStr), e); + } + } + // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE + // TODO task specific validate logic should directly call to PinotTaskGenerator API + if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { + if (taskTypeConfig != null) { Review comment: Remove ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException(String.format( + "Task %s contains an invalid cron schedule: %s", taskConfigEntry.getKey(), cronExprStr), e); + } + } + // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE + // TODO task specific validate logic should directly call to PinotTaskGenerator API + if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { + if (taskTypeConfig != null) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertConfig() == null + || tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE), + "TableConfig cannot have upsert config when using RealtimeToOfflineTask!"); + // check no malformed period + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", "2d")); + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", "1d")); + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod", "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of("concat", "rollup", "dedup").contains( + taskTypeConfig.getOrDefault("mergeType", "concat")), + "MergeType must be one of [concat, rollup, dedup]!"); Review comment: (Major) The check should be case-insensitive. We usually prefer all capital for enum ```suggestion Preconditions.checkState(ImmutableSet.of("CONCAT", "ROLLUP", "DEDUP").contains( taskTypeConfig.getOrDefault("mergeType", "CONCAT").toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); ``` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { Review comment: Annotate with `@VisibleForTesting` ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException(String.format( + "Task %s contains an invalid cron schedule: %s", taskConfigEntry.getKey(), cronExprStr), e); + } + } + // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE + // TODO task specific validate logic should directly call to PinotTaskGenerator API + if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { + if (taskTypeConfig != null) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertConfig() == null + || tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE), + "TableConfig cannot have upsert config when using RealtimeToOfflineTask!"); + // check no malformed period + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", "2d")); Review comment: Suggest wrapping this into a helper method, and only check when the config exists. No need to validate the default value ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException(String.format( + "Task %s contains an invalid cron schedule: %s", taskConfigEntry.getKey(), cronExprStr), e); + } + } + // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE + // TODO task specific validate logic should directly call to PinotTaskGenerator API + if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { + if (taskTypeConfig != null) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertConfig() == null + || tableConfig.getUpsertConfig().getMode().equals(UpsertConfig.Mode.NONE), + "TableConfig cannot have upsert config when using RealtimeToOfflineTask!"); + // check no malformed period + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", "2d")); + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", "1d")); + TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod", "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of("concat", "rollup", "dedup").contains( + taskTypeConfig.getOrDefault("mergeType", "concat")), + "MergeType must be one of [concat, rollup, dedup]!"); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskTypeConfig.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState( + columnNames.contains(StringUtils.removeEnd(entry.getKey(), ".aggregationType")), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + Preconditions.checkState(ImmutableSet.of("sum", "max", "min").contains(entry.getValue()), Review comment: Same here, case-insensitive check ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -303,17 +306,51 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - private static void validateTaskConfigs(TableConfig tableConfig) { + static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null && taskConfig.isTaskTypeEnabled(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE)) { - Map<String, String> taskTypeConfig = taskConfig.getConfigsForTaskType(SEGMENT_GENERATION_AND_PUSH_TASK_TYPE); - if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("SegmentGenerationAndPushTask contains an invalid cron schedule: %s", cronExprStr), e); + if (taskConfig != null) { + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); + if (taskTypeConfig != null && taskTypeConfig.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException(String.format( + "Task %s contains an invalid cron schedule: %s", taskConfigEntry.getKey(), cronExprStr), e); + } + } + // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE + // TODO task specific validate logic should directly call to PinotTaskGenerator API + if (taskConfigEntry.getKey().equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { + if (taskTypeConfig != null) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertConfig() == null Review comment: This is equivalent to the check on line 351. We can remove the check here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org