This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 381097dc1b Added support to perform task validations for plug-in tasks (#14340) 381097dc1b is described below commit 381097dc1ba990839f43fe40fc9144fb9c879769 Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Thu Nov 7 22:08:16 2024 -0800 Added support to perform task validations for plug-in tasks (#14340) * Added support to perform task validations for plug-in tasks * trigger build2 --- .../api/resources/PinotTableRestletResource.java | 8 + .../api/resources/TableConfigsRestletResource.java | 7 + .../core/minion/generator/PinotTaskGenerator.java | 8 + .../pinot/controller/util/TaskConfigUtils.java | 95 ++++++++++ .../api/PinotTableRestletResourceTest.java | 42 ++++- .../core/minion/PinotTaskManagerStatelessTest.java | 50 ++++- .../pinot/controller/util/TaskConfigUtilsTest.java | 122 +++++++++++++ .../apache/pinot/core/common/MinionConstants.java | 24 +++ .../RealtimeToOfflineSegmentsTaskGenerator.java | 46 +++++ .../UpsertCompactionTaskGenerator.java | 47 +++++ ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 137 ++++++++++++++ .../UpsertCompactionTaskGeneratorTest.java | 58 ++++++ .../segment/local/utils/TableConfigUtils.java | 122 ------------- .../segment/local/utils/TableConfigUtilsTest.java | 203 --------------------- 14 files changed, 635 insertions(+), 334 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 0320e149d7..040c57c885 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -94,6 +94,7 @@ import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; @@ -105,6 +106,7 @@ import org.apache.pinot.controller.tuner.TableConfigTunerUtils; import org.apache.pinot.controller.util.CompletionServiceHelper; import org.apache.pinot.controller.util.TableIngestionStatusHelper; import org.apache.pinot.controller.util.TableMetadataReader; +import org.apache.pinot.controller.util.TaskConfigUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.ManualAuthorization; @@ -170,6 +172,9 @@ public class PinotTableRestletResource { @Inject PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; + @Inject + PinotTaskManager _pinotTaskManager; + @Inject ControllerConf _controllerConf; @@ -234,6 +239,7 @@ public class PinotTableRestletResource { TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig); + TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); } catch (Exception e) { throw new InvalidTableConfigException(e); } @@ -508,6 +514,7 @@ public class PinotTableRestletResource { TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig); + TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); } catch (Exception e) { throw new InvalidTableConfigException(e); } @@ -568,6 +575,7 @@ public class PinotTableRestletResource { throw new SchemaNotFoundException("Got empty schema"); } TableConfigUtils.validate(tableConfig, schema, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode(); if (tableConfig.getTableType() == TableType.OFFLINE) { tableConfigValidateStr.set(TableType.OFFLINE.name(), tableConfig.toJsonNode()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java index df543614b9..5d55df6095 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java @@ -61,7 +61,9 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; +import org.apache.pinot.controller.util.TaskConfigUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.ManualAuthorization; @@ -102,6 +104,9 @@ public class TableConfigsRestletResource { @Inject PinotHelixResourceManager _pinotHelixResourceManager; + @Inject + PinotTaskManager _pinotTaskManager; + @Inject ControllerConf _controllerConf; @@ -457,6 +462,7 @@ public class TableConfigsRestletResource { "Name in 'offline' table config: %s must be equal to 'tableName': %s", offlineRawTableName, rawTableName); TableConfigUtils.validateTableName(offlineTableConfig); TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), _pinotTaskManager, typesToSkip); } if (realtimeTableConfig != null) { String realtimeRawTableName = DatabaseUtils.translateTableName( @@ -465,6 +471,7 @@ public class TableConfigsRestletResource { "Name in 'realtime' table config: %s must be equal to 'tableName': %s", realtimeRawTableName, rawTableName); TableConfigUtils.validateTableName(realtimeTableConfig); TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), _pinotTaskManager, typesToSkip); } if (offlineTableConfig != null && realtimeTableConfig != null) { TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java index fd2a4615ed..9be76f253d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java @@ -99,4 +99,12 @@ public interface PinotTaskGenerator { default String getMinionInstanceTag(TableConfig tableConfig) { return CommonConstants.Helix.UNTAGGED_MINION_INSTANCE; } + + /** + * Performs task type specific validations for the given task type. + * @param tableConfig The table configuration that is getting added/updated/validated. + * @param taskConfigs The task type specific task configuration to be validated. + */ + default void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java new file mode 100644 index 0000000000..059908ea8d --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Map; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.quartz.CronScheduleBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TaskConfigUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigUtils.class); + private static final String SCHEDULE_KEY = "schedule"; + private static final String ALL_VALIDATION_TYPE = "ALL"; + private static final String TASK_VALIDATION_TYPE = "TASK"; + + private TaskConfigUtils() { + } + + public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager pinotTaskManager, + String validationTypesToSkip) { + if (tableConfig == null || tableConfig.getTaskConfig() == null) { + return; + } + if (ALL_VALIDATION_TYPE.equalsIgnoreCase(validationTypesToSkip) || TASK_VALIDATION_TYPE.equalsIgnoreCase( + validationTypesToSkip)) { + LOGGER.info("Skipping task validation as validationTypesToSkip is set to: {}", validationTypesToSkip); + return; + } + TableTaskConfig taskConfig = tableConfig.getTaskConfig(); + for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { + String taskType = taskConfigEntry.getKey(); + TaskGeneratorRegistry taskRegistry = pinotTaskManager.getTaskGeneratorRegistry(); + if (taskRegistry != null) { + PinotTaskGenerator taskGenerator = taskRegistry.getTaskGenerator(taskType); + if (taskGenerator != null) { + Map<String, String> taskConfigs = taskConfigEntry.getValue(); + doCommonTaskValidations(tableConfig, taskType, taskConfigs); + taskGenerator.validateTaskConfigs(tableConfig, taskConfigs); + } else { + throw new RuntimeException(String.format("Task generator not found for task type: %s, while validating table " + + "configs for table: %s", taskType, tableConfig.getTableName())); + } + } + } + } + + @VisibleForTesting + static void doCommonTaskValidations(TableConfig tableConfig, String taskType, + Map<String, String> taskConfigs) { + Preconditions.checkNotNull(taskConfigs, String.format("Task configuration for %s cannot be null.", taskType)); + // Schedule key for task config has to be set. + if (taskConfigs.containsKey(SCHEDULE_KEY)) { + String cronExprStr = taskConfigs.get(SCHEDULE_KEY); + try { + CronScheduleBuilder.cronSchedule(cronExprStr); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Task %s contains an invalid cron schedule: %s", taskType, cronExprStr), e); + } + } + boolean isAllowDownloadFromServer = Boolean.parseBoolean( + taskConfigs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, + String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER))); + if (isAllowDownloadFromServer) { + Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null, + String.format("Table %s has task %s with allowDownloadFromServer set to true, but " + + "peerSegmentDownloadScheme is not set in the table config", tableConfig.getTableName(), + taskType)); + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java index 54644c26aa..fdb8c09072 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java @@ -30,8 +30,10 @@ import java.util.List; import java.util.Map; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -64,6 +66,7 @@ public class PinotTableRestletResourceTest extends ControllerTest { public void setUp() throws Exception { DEFAULT_INSTANCE.setupSharedStateAndValidate(); + registerMinionTasks(); _createTableUrl = DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate(); _offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS") .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5"); @@ -77,6 +80,43 @@ public class PinotTableRestletResourceTest extends ControllerTest { .setStreamConfigs(streamConfig.getStreamConfigsMap()); } + private void registerMinionTasks() { + PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager(); + taskManager.registerTaskGenerator(new BaseTaskGenerator() { + @Override + public String getTaskType() { + return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + return List.of(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new HashMap<>())); + } + }); + taskManager.registerTaskGenerator(new BaseTaskGenerator() { + @Override + public String getTaskType() { + return MinionConstants.MergeRollupTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + return List.of(new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>())); + } + }); + taskManager.registerTaskGenerator(new BaseTaskGenerator() { + @Override + public String getTaskType() { + return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + return List.of(new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>())); + } + }); + } + @Test public void testCreateTable() throws Exception { @@ -708,6 +748,6 @@ public class PinotTableRestletResourceTest extends ControllerTest { @AfterClass public void tearDown() { - DEFAULT_INSTANCE.cleanup(); + DEFAULT_INSTANCE.stopSharedTestSetup(); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java index e4405502ef..f224f4cd56 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java @@ -19,6 +19,10 @@ package org.apache.pinot.controller.helix.core.minion; import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,6 +30,8 @@ import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import org.apache.helix.task.TaskState; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; @@ -103,7 +109,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 * * ? * * *")))).build(); waitForEVToDisappear(tableConfig.getTableName()); - addTableConfig(tableConfig); + addTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE), "JobGroupNames should have SegmentGenerationAndPushTask only"); @@ -139,7 +145,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { new TableTaskConfig( ImmutableMap.of(segmentGenerationAndPushTask, ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build(); waitForEVToDisappear(tableConfig.getTableName()); - addTableConfig(tableConfig); + addTableConfig(tableConfig, "TASK"); waitForJobGroupNames(taskManager, jgn -> jgn.size() == 1 && jgn.contains(segmentGenerationAndPushTask), "JobGroupNames should have SegmentGenerationAndPushTask only"); validateJob(segmentGenerationAndPushTask, "0 */10 * ? * * *"); @@ -233,7 +239,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build(); waitForEVToDisappear(tableConfig.getTableName()); - addTableConfig(tableConfig); + addTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE), "JobGroupNames should have SegmentGenerationAndPushTask only"); @@ -242,7 +248,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { // 2. Update table to new schedule tableConfig.setTaskConfig(new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */20 * ? * * *")))); - updateTableConfig(tableConfig); + updateTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE), "JobGroupNames should have SegmentGenerationAndPushTask only"); @@ -252,7 +258,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { tableConfig.setTaskConfig(new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */30 * ? * * *"), "MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))); - updateTableConfig(tableConfig); + updateTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 2 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) && jgn.contains( MinionConstants.MergeRollupTask.TASK_TYPE), @@ -263,7 +269,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { // 4. Remove one task from the table tableConfig.setTaskConfig( new TableTaskConfig(ImmutableMap.of("MergeRollupTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))); - updateTableConfig(tableConfig); + updateTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.MergeRollupTask.TASK_TYPE), "JobGroupNames should have MergeRollupTask only"); @@ -300,7 +306,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build(); waitForEVToDisappear(tableConfig.getTableName()); - addTableConfig(tableConfig); + addTableConfig(tableConfig, "TASK"); waitForJobGroupNames(_controllerStarter.getTaskManager(), jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE), "JobGroupNames should have SegmentGenerationAndPushTask only"); @@ -321,7 +327,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { tableConfig.setTaskConfig(new TableTaskConfig( ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"), "MergeRollupTask", ImmutableMap.of("schedule", "0 */20 * ? * * *")))); - updateTableConfig(tableConfig); + updateTableConfig(tableConfig, "TASK"); // Task is put into table config. TableConfig tableConfigAfterRestart = @@ -407,6 +413,34 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { }, TIMEOUT_IN_MS, 500L, "Cron expression didn't change to " + cronExpression); } + private void addTableConfig(TableConfig tableConfig, String validationTypesToSkip) + throws IOException { + String createTableUriStr = + String.format(_controllerRequestURLBuilder.forTableCreate() + "?validationTypesToSkip=%s", + validationTypesToSkip); + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPostRequest(new URI(createTableUriStr), tableConfig.toJsonString(), + Collections.emptyMap())); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + + private void updateTableConfig(TableConfig tableConfig, String validationTypesToSkip) + throws IOException { + String updateTableUriStr = String.format( + _controllerRequestURLBuilder.forUpdateTableConfig(tableConfig.getTableName()) + "?validationTypesToSkip=%s", + validationTypesToSkip); + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPutRequest(new URI(updateTableUriStr), tableConfig.toJsonString(), + Collections.emptyMap())); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + @AfterClass public void tearDown() { stopZk(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java new file mode 100644 index 0000000000..000bf9826c --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; + + +public class TaskConfigUtilsTest { + PinotTaskManager _mockTaskManager; + TaskGeneratorRegistry _mockTaskRegistry; + PinotTaskGenerator _taskGenerator; + private static final String TEST_TASK_TYPE = "myTask"; + private static final String TEST_TABLE_NAME = "myTable"; + + @BeforeMethod + public void setup() { + _mockTaskManager = Mockito.mock(PinotTaskManager.class); + _mockTaskRegistry = Mockito.mock(TaskGeneratorRegistry.class); + + _taskGenerator = new BaseTaskGenerator() { + + @Override + public String getTaskType() { + return TEST_TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + return List.of(new PinotTaskConfig(TEST_TASK_TYPE, new HashMap<>())); + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + throw new RuntimeException("TableConfig validation failed"); + } + }; + + when(_mockTaskRegistry.getTaskGenerator(TEST_TASK_TYPE)).thenReturn(_taskGenerator); + when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry); + } + + @Test (expectedExceptions = RuntimeException.class) + public void testValidateTableTaskConfigsValidationException() { + TableTaskConfig tableTaskConfig = + new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10 * ? * * *"))); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build(); + TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null); + } + + @Test (expectedExceptions = RuntimeException.class) + public void testValidateTableTaskConfigsUnknownTaskType() { + TableTaskConfig tableTaskConfig = + new TableTaskConfig(ImmutableMap.of("otherTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build(); + TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null); + } + + @Test + public void testCommonTaskValidations() { + // invalid schedule + HashMap<String, String> invalidScheduleConfig = new HashMap<>(); + invalidScheduleConfig.put("schedule", "invalidSchedule"); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig( + new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, invalidScheduleConfig))).build(); + + try { + TaskConfigUtils.doCommonTaskValidations(tableConfig, TEST_TASK_TYPE, invalidScheduleConfig); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("contains an invalid cron schedule")); + } + + // invalid allowDownloadFromServer config + HashMap<String, String> invalidAllowDownloadFromServerConfig = new HashMap<>(); + invalidAllowDownloadFromServerConfig.put("allowDownloadFromServer", "true"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig( + new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, invalidAllowDownloadFromServerConfig))).build(); + try { + TaskConfigUtils.doCommonTaskValidations(tableConfig, TEST_TASK_TYPE, invalidAllowDownloadFromServerConfig); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains( + "allowDownloadFromServer set to true, but " + "peerSegmentDownloadScheme is not set in the table config")); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 9b1b89b4b7..6ad497a4a3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -18,6 +18,12 @@ */ package org.apache.pinot.core.common; +import java.util.EnumSet; +import org.apache.pinot.segment.spi.AggregationFunctionType; + +import static org.apache.pinot.segment.spi.AggregationFunctionType.*; + + public class MinionConstants { private MinionConstants() { } @@ -138,6 +144,19 @@ public class MinionConstants { @Deprecated // Replaced by MERGE_TYPE_KEY public static final String COLLECTOR_TYPE_KEY = "collectorType"; + + public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod"; + public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod"; + public static final String ROUND_BUCKET_TIME_PERIOD_KEY = "roundBucketTimePeriod"; + public static final String MERGE_TYPE_KEY = "mergeType"; + public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType"; + + public final static EnumSet<AggregationFunctionType> AVAILABLE_CORE_VALUE_AGGREGATORS = + EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, + DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, + SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, + DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, + DISTINCTCOUNTRAWULL); } // Generate segment and push to controller based on batch ingestion configs @@ -170,6 +189,11 @@ public class MinionConstants { */ public static final String VALID_DOC_IDS_TYPE = "validDocIdsType"; + /** + * Value for the key VALID_DOC_IDS_TYPE + */ + public static final String SNAPSHOT = "snapshot"; + /** * number of segments to query in one batch to fetch valid doc id metadata, by default 500 */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java index 0323e26747..73ff19ebef 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -37,11 +38,15 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtil import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.core.segment.processing.framework.MergeType; import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.annotations.minion.TaskGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Segment; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; @@ -314,4 +319,45 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator { } return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(); } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is not upsert + Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, + "RealtimeToOfflineTask doesn't support upsert table!"); + // check no malformed period + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d")); + TimeUtils.convertPeriodToMillis( + taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s")); + // check mergeType is correct + Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) + .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) + .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); + + Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check no mis-configured columns + Set<String> columnNames = schema.getColumnNames(); + for (Map.Entry<String, String> entry : taskConfigs.entrySet()) { + if (entry.getKey().endsWith(".aggregationType")) { + Preconditions.checkState(columnNames.contains( + StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)), + String.format("Column \"%s\" not found in schema!", entry.getKey())); + try { + // check that it's a valid aggregation function type + AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); + // check that a value aggregator is available + if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { + throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); + } + } catch (IllegalArgumentException e) { + String err = + String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); + throw new IllegalStateException(err); + } + } + } + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 9f64e30cf1..ae74bc8ca9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -304,4 +304,51 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { } return true; } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) { + // check table is realtime + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "UpsertCompactionTask only supports realtime tables!"); + // check upsert enabled + Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask"); + + // check no malformed period + if (taskConfigs.containsKey(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY)) { + TimeUtils.convertPeriodToMillis(taskConfigs.get(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY)); + } + // check invalidRecordsThresholdPercent + if (taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT)) { + Preconditions.checkState( + Double.parseDouble(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT)) >= 0 + && Double.parseDouble(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT)) <= 100, + "invalidRecordsThresholdPercent must be >= 0 and <= 100"); + } + // check invalidRecordsThresholdCount + if (taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT)) { + Preconditions.checkState( + Long.parseLong(taskConfigs.get(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT)) >= 1, + "invalidRecordsThresholdCount must be >= 1"); + } + // check that either invalidRecordsThresholdPercent or invalidRecordsThresholdCount was provided + Preconditions.checkState( + taskConfigs.containsKey(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT) || taskConfigs.containsKey( + UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT), + "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided"); + String validDocIdsType = + taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, UpsertCompactionTask.SNAPSHOT); + if (validDocIdsType.equals(ValidDocIdsType.SNAPSHOT.toString())) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask"); + Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format( + "'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = " + + "%s", validDocIdsType)); + } else if (validDocIdsType.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask"); + Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(), String.format( + "deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s", + validDocIdsType)); + } + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java index 84169f3b93..49a9fd8d57 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.Collections; import java.util.HashMap; @@ -28,6 +29,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask; @@ -35,9 +37,13 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -508,6 +514,137 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { assertTrue(pinotTaskConfigs.isEmpty()); } + @Test + public void testRealtimeToOfflineSegmentsTaskConfig() { + ClusterInfoAccessor mockClusterInfoAccessor = mock(ClusterInfoAccessor.class); + PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class); + when(mockClusterInfoAccessor.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager); + + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + + when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema); + + RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator(); + taskGenerator.init(mockClusterInfoAccessor); + + Map<String, String> realtimeToOfflineTaskConfig = + ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", + "rollup", "myCol.aggregationType", "max"); + + Map<String, String> segmentGenerationAndPushTaskConfig = ImmutableMap.of("schedule", "0 */10 * ? * * *"); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, + "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); + + // validate valid config + taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig); + + // invalid Upsert config with RealtimeToOfflineTask + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, + "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't support upsert table")); + } + + // invalid period + HashMap<String, String> invalidPeriodConfig = new HashMap<>(realtimeToOfflineTaskConfig); + invalidPeriodConfig.put("roundBucketTimePeriod", "garbage"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("Invalid time spec")); + } + + // invalid mergeType + HashMap<String, String> invalidMergeType = new HashMap<>(realtimeToOfflineTaskConfig); + invalidMergeType.put("mergeType", "garbage"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("MergeType must be one of")); + } + + // invalid column + HashMap<String, String> invalidColumnConfig = new HashMap<>(realtimeToOfflineTaskConfig); + invalidColumnConfig.put("score.aggregationType", "max"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("not found in schema")); + } + + // invalid agg + HashMap<String, String> invalidAggConfig = new HashMap<>(realtimeToOfflineTaskConfig); + invalidAggConfig.put("myCol.aggregationType", "garbage"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); + } + + // aggregation function that exists but has no ValueAggregator available + HashMap<String, String> invalidAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig); + invalidAgg2Config.put("myCol.aggregationType", "Histogram"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + try { + taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); + } + + // valid agg + HashMap<String, String> validAggConfig = new HashMap<>(realtimeToOfflineTaskConfig); + validAggConfig.put("myCol.aggregationType", "distinctCountHLL"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + taskGenerator.validateTaskConfigs(tableConfig, validAggConfig); + + // valid agg + HashMap<String, String> validAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig); + validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus"); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, "SegmentGenerationAndPushTask", + segmentGenerationAndPushTaskConfig))).build(); + taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config); + } + private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status, long startTime, long endTime, TimeUnit timeUnit, String downloadURL) { SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata(segmentName); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index ec8d8ea786..604c1aa476 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.minion.tasks.upsertcompaction; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -330,6 +331,63 @@ public class UpsertCompactionTaskGeneratorTest { _completedSegment2.getSegmentName()); } + @Test + public void testUpsertCompactionTaskConfig() { + Map<String, String> upsertCompactionTaskConfig = + ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount", + "1"); + UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setEnableSnapshot(true); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(upsertConfig) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + + _taskGenerator.validateTaskConfigs(tableConfig, upsertCompactionTaskConfig); + + // test with invalidRecordsThresholdPercents as 0 + Map<String, String> upsertCompactionTaskConfig1 = ImmutableMap.of("invalidRecordsThresholdPercent", "0"); + TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(upsertConfig) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig1))) + .build(); + _taskGenerator.validateTaskConfigs(zeroPercentTableConfig, upsertCompactionTaskConfig1); + + // test with invalid invalidRecordsThresholdPercents as -1 and 110 + Map<String, String> upsertCompactionTaskConfig2 = ImmutableMap.of("invalidRecordsThresholdPercent", "-1"); + TableConfig negativePercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(upsertConfig) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig2))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig, upsertCompactionTaskConfig2)); + Map<String, String> upsertCompactionTaskConfig3 = ImmutableMap.of("invalidRecordsThresholdPercent", "110"); + TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig3))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig, upsertCompactionTaskConfig3)); + + // test with invalid invalidRecordsThresholdCount + Map<String, String> upsertCompactionTaskConfig4 = ImmutableMap.of("invalidRecordsThresholdCount", "0"); + TableConfig invalidCountTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig4))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig, upsertCompactionTaskConfig4)); + + // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount + Map<String, String> upsertCompactionTaskConfig5 = ImmutableMap.of("bufferTimePeriod", "5d"); + TableConfig invalidTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig5))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(invalidTableConfig, upsertCompactionTaskConfig5)); + } + private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent, String invalidRecordsThresholdCount) { Map<String, String> compactionConfigs = new HashMap<>(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 959fdfc51d..722fb3e4b9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.restlet.resources.ValidDocIdsType; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; @@ -94,7 +93,6 @@ import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.quartz.CronScheduleBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,11 +108,9 @@ public final class TableConfigUtils { } private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class); - private static final String SCHEDULE_KEY = "schedule"; private static final String STAR_TREE_CONFIG_NAME = "StarTreeIndex Config"; // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}. - private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask"; private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask"; // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we @@ -192,9 +188,6 @@ public final class TableConfigUtils { validateUpsertAndDedupConfig(tableConfig, schema); validatePartialUpsertStrategies(tableConfig, schema); } - if (!skipTypes.contains(ValidationType.TASK)) { - validateTaskConfigs(tableConfig, schema); - } if (_enforcePoolBasedAssignment) { validateInstancePoolsNReplicaGroups(tableConfig); @@ -665,121 +658,6 @@ public final class TableConfigUtils { } } - public final static EnumSet<AggregationFunctionType> AVAILABLE_CORE_VALUE_AGGREGATORS = - EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH, - DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, - SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, - DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, DISTINCTCOUNTRAWULL); - - @VisibleForTesting - static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { - TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null) { - for (Map.Entry<String, Map<String, String>> taskConfigEntry : taskConfig.getTaskTypeConfigsMap().entrySet()) { - String taskTypeConfigName = taskConfigEntry.getKey(); - Map<String, String> taskTypeConfig = taskConfigEntry.getValue(); - // Task configuration cannot be null. - Preconditions.checkNotNull(taskTypeConfig, - String.format("Task configuration for \"%s\" cannot be null!", taskTypeConfigName)); - // Schedule key for task config has to be set. - if (taskTypeConfig.containsKey(SCHEDULE_KEY)) { - String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY); - try { - CronScheduleBuilder.cronSchedule(cronExprStr); - } catch (Exception e) { - throw new IllegalStateException( - String.format("Task %s contains an invalid cron schedule: %s", taskTypeConfigName, cronExprStr), e); - } - } - boolean isAllowDownloadFromServer = Boolean.parseBoolean( - taskTypeConfig.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, - String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER))); - if (isAllowDownloadFromServer) { - Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null, - String.format("Table %s has task %s with allowDownloadFromServer set to true, but " - + "peerSegmentDownloadScheme is not set in the table config", tableConfig.getTableName(), - taskTypeConfigName)); - } - // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE - // TODO task specific validate logic should directly call to PinotTaskGenerator API - if (taskTypeConfigName.equals(REALTIME_TO_OFFLINE_TASK_TYPE)) { - // check table is not upsert - Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, - "RealtimeToOfflineTask doesn't support upsert table!"); - // check no malformed period - TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bufferTimePeriod", "2d")); - TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("bucketTimePeriod", "1d")); - TimeUtils.convertPeriodToMillis(taskTypeConfig.getOrDefault("roundBucketTimePeriod", "1s")); - // check mergeType is correct - Preconditions.checkState(ImmutableSet.of("CONCAT", "ROLLUP", "DEDUP") - .contains(taskTypeConfig.getOrDefault("mergeType", "CONCAT").toUpperCase()), - "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); - // check no mis-configured columns - Set<String> columnNames = schema.getColumnNames(); - for (Map.Entry<String, String> entry : taskTypeConfig.entrySet()) { - if (entry.getKey().endsWith(".aggregationType")) { - Preconditions.checkState(columnNames.contains(StringUtils.removeEnd(entry.getKey(), ".aggregationType")), - String.format("Column \"%s\" not found in schema!", entry.getKey())); - try { - // check that it's a valid aggregation function type - AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue()); - // check that a value aggregator is available - if (!AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) { - throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString()); - } - } catch (IllegalArgumentException e) { - String err = - String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()); - throw new IllegalStateException(err); - } - } - } - } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) { - // check table is realtime - Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, - "UpsertCompactionTask only supports realtime tables!"); - // check upsert enabled - Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask"); - - // check no malformed period - if (taskTypeConfig.containsKey("bufferTimePeriod")) { - TimeUtils.convertPeriodToMillis(taskTypeConfig.get("bufferTimePeriod")); - } - // check invalidRecordsThresholdPercent - if (taskTypeConfig.containsKey("invalidRecordsThresholdPercent")) { - Preconditions.checkState(Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) >= 0 - && Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) <= 100, - "invalidRecordsThresholdPercent must be >= 0 and <= 100"); - } - // check invalidRecordsThresholdCount - if (taskTypeConfig.containsKey("invalidRecordsThresholdCount")) { - Preconditions.checkState(Long.parseLong(taskTypeConfig.get("invalidRecordsThresholdCount")) >= 1, - "invalidRecordsThresholdCount must be >= 1"); - } - // check that either invalidRecordsThresholdPercent or invalidRecordsThresholdCount was provided - Preconditions.checkState( - taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || taskTypeConfig.containsKey( - "invalidRecordsThresholdCount"), - "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided"); - String validDocIdsType = taskTypeConfig.getOrDefault("validDocIdsType", "snapshot"); - if (validDocIdsType.equals(ValidDocIdsType.SNAPSHOT.toString())) { - UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); - Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask"); - Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format( - "'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = " - + "%s", validDocIdsType)); - } else if (validDocIdsType.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) { - UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); - Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask"); - Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(), String.format( - "deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s", - validDocIdsType)); - } - } - } - } - } - /** * Validates the upsert-related configurations * - check table type is realtime diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 72992e99cf..98b0ba552c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -44,7 +44,6 @@ import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; -import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TagOverrideConfig; import org.apache.pinot.spi.config.table.TenantConfig; @@ -2273,147 +2272,6 @@ public class TableConfigUtilsTest { }); } - @Test - public void testTaskConfig() { - Schema schema = - new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) - .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") - .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); - Map<String, String> realtimeToOfflineTaskConfig = - ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", - "rollup", "myCol.aggregationType", "max"); - Map<String, String> segmentGenerationAndPushTaskConfig = ImmutableMap.of("schedule", "0 */10 * ? * * *"); - TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig( - new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, - "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); - - // validate valid config - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - - // invalid schedule - HashMap<String, String> invalidScheduleConfig = new HashMap<>(segmentGenerationAndPushTaskConfig); - invalidScheduleConfig.put("schedule", "garbage"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask", - invalidScheduleConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("contains an invalid cron schedule")); - } - - // invalid allowDownloadFromServer config - HashMap<String, String> invalidAllowDownloadFromServerConfig = new HashMap<>(segmentGenerationAndPushTaskConfig); - invalidAllowDownloadFromServerConfig.put("allowDownloadFromServer", "true"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask", - invalidAllowDownloadFromServerConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("allowDownloadFromServer set to true, but " - + "peerSegmentDownloadScheme is not set in the table config")); - } - - // invalid Upsert config with RealtimeToOfflineTask - tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs()).setTaskConfig( - new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, - "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't support upsert table")); - } - - // validate that TASK config will be skipped with skip string. - TableConfigUtils.validate(tableConfig, schema, "TASK,UPSERT"); - - // invalid period - HashMap<String, String> invalidPeriodConfig = new HashMap<>(realtimeToOfflineTaskConfig); - invalidPeriodConfig.put("roundBucketTimePeriod", "garbage"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains("Invalid time spec")); - } - - // invalid mergeType - HashMap<String, String> invalidMergeType = new HashMap<>(realtimeToOfflineTaskConfig); - invalidMergeType.put("mergeType", "garbage"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("MergeType must be one of")); - } - - // invalid column - HashMap<String, String> invalidColumnConfig = new HashMap<>(realtimeToOfflineTaskConfig); - invalidColumnConfig.put("score.aggregationType", "max"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("not found in schema")); - } - - // invalid agg - HashMap<String, String> invalidAggConfig = new HashMap<>(realtimeToOfflineTaskConfig); - invalidAggConfig.put("myCol.aggregationType", "garbage"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); - } - - // aggregation function that exists but has no ValueAggregator available - HashMap<String, String> invalidAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig); - invalidAgg2Config.put("myCol.aggregationType", "Histogram"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - try { - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); - } - - // valid agg - HashMap<String, String> validAggConfig = new HashMap<>(realtimeToOfflineTaskConfig); - validAggConfig.put("myCol.aggregationType", "distinctCountHLL"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - - // valid agg - HashMap<String, String> validAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig); - validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus"); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig( - ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, "SegmentGenerationAndPushTask", - segmentGenerationAndPushTaskConfig))).build(); - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - } - @Test public void testValidateInstancePartitionsMap() { InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class); @@ -2548,67 +2406,6 @@ public class TableConfigUtilsTest { } } - @Test - public void testUpsertCompactionTaskConfig() { - Schema schema = - new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) - .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") - .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); - Map<String, String> upsertCompactionTaskConfig = - ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount", - "1"); - UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); - upsertConfig.setEnableSnapshot(true); - TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(upsertConfig) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - - TableConfigUtils.validateTaskConfigs(tableConfig, schema); - - // test with invalidRecordsThresholdPercents as 0 - upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "0"); - TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(upsertConfig) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - TableConfigUtils.validateTaskConfigs(zeroPercentTableConfig, schema); - - // test with invalid invalidRecordsThresholdPercents as -1 and 110 - upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "-1"); - TableConfig negativePercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(upsertConfig) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - Assert.assertThrows(IllegalStateException.class, - () -> TableConfigUtils.validateTaskConfigs(negativePercentTableConfig, schema)); - upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "110"); - TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - Assert.assertThrows(IllegalStateException.class, - () -> TableConfigUtils.validateTaskConfigs(hundredTenPercentTableConfig, schema)); - - // test with invalid invalidRecordsThresholdCount - upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdCount", "0"); - TableConfig invalidCountTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - Assert.assertThrows(IllegalStateException.class, - () -> TableConfigUtils.validateTaskConfigs(invalidCountTableConfig, schema)); - - // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount - upsertCompactionTaskConfig = ImmutableMap.of("bufferTimePeriod", "5d"); - TableConfig invalidTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) - .build(); - Assert.assertThrows(IllegalStateException.class, - () -> TableConfigUtils.validateTaskConfigs(invalidTableConfig, schema)); - } - @Test public void testValidatePartitionedReplicaGroupInstance() { String partitionColumn = "testPartitionCol"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org