This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new f17be35 Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468) f17be35 is described below commit f17be35c348686c8041f0c3e2aef87f62e2cc97a Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Thu Jan 21 15:20:46 2021 -0800 Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468) * Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance * Switch to real controller test * Address comments --- .../helix/ControllerRequestURLBuilder.java | 4 ++ .../helix/core/minion/ClusterInfoAccessor.java | 16 +++++ .../SegmentGenerationAndPushTaskGenerator.java | 16 +++++ .../SegmentGenerationAndPushTaskGeneratorTest.java | 80 ++++++++++++++++++++++ .../apache/pinot/core/common/MinionConstants.java | 2 + 5 files changed, 118 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java index c77ae3b..f938e0b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -332,4 +332,8 @@ public class ControllerRequestURLBuilder { .collect(Collectors.joining(",", "{", "}")); return forIngestFromURI(tableNameWithType, batchConfigMapStr, sourceURIStr); } + + public String forClusterConfigs() { + return StringUtil.join("/", _baseUrl, "cluster/configs"); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java index 8d3db71..7b1ff36 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java @@ -18,9 +18,12 @@ */ package org.apache.pinot.controller.helix.core.minion; +import java.util.Collections; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; @@ -157,4 +160,17 @@ public class ClusterInfoAccessor { public String getVipUrl() { return _controllerConf.generateVipUrl(); } + + /** + * Get the cluster config for a given config name, return null if not found. + * + * @return cluster config + */ + public String getClusterConfig(String configName) { + HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build(); + Map<String, String> configMap = + _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName)); + return configMap != null ? configMap.get(configName) : null; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java index 41b7740..3ea3d31 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; @@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator } @Override + public int getNumConcurrentTasksPerInstance() { + String numConcurrentTasksPerInstanceStr = _clusterInfoAccessor + .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE); + if (numConcurrentTasksPerInstanceStr != null) { + try { + return Integer.parseInt(numConcurrentTasksPerInstanceStr); + } catch (Exception e) { + LOGGER.error("Failed to parse cluster config: {}", + MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e); + } + } + return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; + } + + @Override public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java new file mode 100644 index 0000000..5a81ef1 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import java.util.Collections; +import java.util.Map; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest} + */ +public class SegmentGenerationAndPushTaskGeneratorTest extends ControllerTest { + SegmentGenerationAndPushTaskGenerator _generator; + + @BeforeClass + public void setup() { + int zkPort = 2171; + startZk(zkPort); + Map<String, Object> properties = getDefaultControllerConfiguration(); + properties.put(ControllerConf.ZK_STR, "localhost:" + zkPort); + properties.put(ControllerConf.HELIX_CLUSTER_NAME, SegmentGenerationAndPushTaskGeneratorTest.class.getSimpleName()); + properties.put(ControllerConf.CONTROLLER_PORT, 28998); + startController(properties); + + ClusterInfoAccessor clusterInfoAccessor = _controllerStarter.getTaskManager().getClusterInfoAccessor(); + _generator = new SegmentGenerationAndPushTaskGenerator(); + _generator.init(clusterInfoAccessor); + } + + @AfterClass + public void tearDown() { + stopController(); + stopZk(); + } + + @Test + public void testRealCluster() + throws Exception { + // Default is 1 + Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1); + + // Set config to 5 + String request = JsonUtils.objectToString(Collections + .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, "5")); + sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request); + Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 5); + + // Set config to invalid and should still get 1 + request = JsonUtils.objectToString(Collections + .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, + "abcd")); + sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request); + Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 546a9fb..d9d1a9d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -92,6 +92,8 @@ public class MinionConstants { // Generate segment and push to controller based on batch ingestion configs public static class SegmentGenerationAndPushTask { public static final String TASK_TYPE = "SegmentGenerationAndPushTask"; + public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE = + "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance"; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org