This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new cba1cb5 Add BaseTaskGenerator to make task timeout and concurrency configurable (#8028) cba1cb5 is described below commit cba1cb5e2bc5e044802d94553391b1b2802a15fe Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jan 14 17:49:19 2022 -0800 Add BaseTaskGenerator to make task timeout and concurrency configurable (#8028) Task timeout and concurrency can be configured via the cluster config: - <TaskType>.timeoutMs - <TaskType>.numConcurrentTasksPerInstance --- .../helix/core/minion/PinotTaskManager.java | 7 +++ .../core/minion/generator/BaseTaskGenerator.java | 71 ++++++++++++++++++++++ .../apache/pinot/core/common/MinionConstants.java | 9 +++ .../tests/SimpleMinionClusterIntegrationTest.java | 19 +++++- .../plugin/minion/tasks/TestTaskGenerator.java | 11 +--- .../ConvertToRawIndexTaskGenerator.java | 12 +--- .../mergerollup/MergeRollupTaskGenerator.java | 16 ++--- .../RealtimeToOfflineSegmentsTaskGenerator.java | 11 +--- .../SegmentGenerationAndPushTaskGenerator.java | 28 +-------- 9 files changed, 117 insertions(+), 67 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 5bafa0a..475ffd2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -345,6 +345,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } /** + * Returns the task generator registry. + */ + public TaskGeneratorRegistry getTaskGeneratorRegistry() { + return _taskGeneratorRegistry; + } + + /** * Registers a task generator. * <p>This method can be used to plug in custom task generators. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java new file mode 100644 index 0000000..4808d97 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import org.apache.helix.task.JobConfig; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.core.common.MinionConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base implementation of the {@link PinotTaskGenerator} which reads the 'taskTimeoutMs' and + * 'numConcurrentTasksPerInstance' from the cluster config. + */ +public abstract class BaseTaskGenerator implements PinotTaskGenerator { + protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTaskGenerator.class); + + protected ClusterInfoAccessor _clusterInfoAccessor; + + @Override + public void init(ClusterInfoAccessor clusterInfoAccessor) { + _clusterInfoAccessor = clusterInfoAccessor; + } + + @Override + public long getTaskTimeoutMs() { + String taskType = getTaskType(); + String configKey = taskType + MinionConstants.TIMEOUT_MS_KEY_SUFFIX; + String configValue = _clusterInfoAccessor.getClusterConfig(configKey); + if (configValue != null) { + try { + return Long.parseLong(configValue); + } catch (Exception e) { + LOGGER.error("Invalid cluster config {}: '{}'", configKey, configValue, e); + } + } + return JobConfig.DEFAULT_TIMEOUT_PER_TASK; + } + + @Override + public int getNumConcurrentTasksPerInstance() { + String taskType = getTaskType(); + String configKey = taskType + MinionConstants.NUM_CONCURRENT_TASKS_PER_INSTANCE_KEY_SUFFIX; + String configValue = _clusterInfoAccessor.getClusterConfig(configKey); + if (configValue != null) { + try { + return Integer.parseInt(configValue); + } catch (Exception e) { + LOGGER.error("Invalid config {}: '{}'", configKey, configValue, e); + } + } + return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 0ce9980..ccf1b44 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -47,6 +47,15 @@ public class MinionConstants { public static final String INITIAL_RETRY_DELAY_MS_KEY = "initialRetryDelayMs"; public static final String RETRY_SCALE_FACTOR_KEY = "retryScaleFactor"; + /** + * Cluster level configs + */ + public static final String TIMEOUT_MS_KEY_SUFFIX = ".timeoutMs"; + public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE_KEY_SUFFIX = ".numConcurrentTasksPerInstance"; + + /** + * Table level configs + */ public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks"; public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 43f803a..d7d1f99 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -21,12 +21,16 @@ package org.apache.pinot.integration.tests; import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.executor.PinotTaskExecutor; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; @@ -70,6 +74,14 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { startController(); startBroker(); startServer(); + startMinion(); + + // Set task timeout in cluster config + PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager(); + helixResourceManager.getHelixAdmin().setConfig( + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + helixResourceManager.getHelixClusterName()).build(), + Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L))); // Add 3 offline tables, where 2 of them have TestTask enabled TableTaskConfig taskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap())); @@ -81,8 +93,13 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); _taskManager = _controllerStarter.getTaskManager(); + } - startMinion(); + @Test + public void testTaskTimeout() { + PinotTaskGenerator taskGenerator = _taskManager.getTaskGeneratorRegistry().getTaskGenerator(TASK_TYPE); + assertNotNull(taskGenerator); + assertEquals(taskGenerator.getTaskTimeoutMs(), 600_000L); } private void verifyTaskCount(String task, int errors, int waiting, int running, int total) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java index 740d3e5..1efa7fb 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java @@ -23,8 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; -import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest; import org.apache.pinot.spi.annotations.minion.TaskGenerator; @@ -37,13 +36,7 @@ import static org.testng.Assert.assertEquals; * Task generator for {@link SimpleMinionClusterIntegrationTest}. */ @TaskGenerator -public class TestTaskGenerator implements PinotTaskGenerator { - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - } +public class TestTaskGenerator extends BaseTaskGenerator { @Override public String getTaskType() { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java index 5fc7c64..0d0d769 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java @@ -29,8 +29,7 @@ import org.apache.pinot.common.data.Segment; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; -import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; @@ -43,16 +42,9 @@ import org.slf4j.LoggerFactory; @TaskGenerator -public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator { +public class ConvertToRawIndexTaskGenerator extends BaseTaskGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class); - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - } - @Override public String getTaskType() { return MinionConstants.ConvertToRawIndexTask.TASK_TYPE; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 1f81011..63139e6 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -39,7 +39,7 @@ import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.minion.MergeRollupTaskMetadata; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; import org.apache.pinot.core.common.MinionConstants; @@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory; * maxNumRecordsPerTask */ @TaskGenerator -public class MergeRollupTaskGenerator implements PinotTaskGenerator { +public class MergeRollupTaskGenerator extends BaseTaskGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; @@ -113,17 +113,9 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets"; // tableNameWithType -> mergeLevel -> watermarkMs - private Map<String, Map<String, Long>> _mergeRollupWatermarks; + private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new HashMap<>(); // tableNameWithType -> maxValidBucketEndTime - private Map<String, Long> _tableMaxValidBucketEndTimeMs; - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - _mergeRollupWatermarks = new HashMap<>(); - _tableMaxValidBucketEndTimeMs = new HashMap<>(); - } + private final Map<String, Long> _tableMaxValidBucketEndTimeMs = new HashMap<>(); @Override public String getTaskType() { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java index 7b0b490..c96d5d4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java @@ -31,7 +31,7 @@ import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; import org.apache.pinot.core.common.MinionConstants; @@ -77,19 +77,12 @@ import org.slf4j.LoggerFactory; * - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task */ @TaskGenerator -public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator { +public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class); private static final String DEFAULT_BUCKET_PERIOD = "1d"; private static final String DEFAULT_BUFFER_PERIOD = "2d"; - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - } - @Override public String getTaskType() { return RealtimeToOfflineSegmentsTask.TASK_TYPE; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java index 16424d1..33dd85a 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java @@ -32,12 +32,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; -import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.annotations.minion.TaskGenerator; @@ -89,39 +87,17 @@ import org.slf4j.LoggerFactory; * */ @TaskGenerator -public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator { +public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class); private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = BatchConfigProperties.SegmentPushType.TAR; - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - } - @Override public String getTaskType() { return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE; } @Override - public int getNumConcurrentTasksPerInstance() { - String numConcurrentTasksPerInstanceStr = _clusterInfoAccessor - .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE); - if (numConcurrentTasksPerInstanceStr != null) { - try { - return Integer.parseInt(numConcurrentTasksPerInstanceStr); - } catch (Exception e) { - LOGGER.error("Failed to parse cluster config: {}", - MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e); - } - } - return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; - } - - @Override public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org