This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe7247fb96 create task queue as needed for adhoc task (#8540) fe7247fb96 is described below commit fe7247fb96c6627eb44f30d6199ffb6fdf5f835f Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Thu Apr 14 01:09:59 2022 -0700 create task queue as needed for adhoc task (#8540) Co-authored-by: Xiaobing Li <xiaob...@startree.ai> --- .../helix/core/minion/PinotTaskManager.java | 5 +- ...mentGenerationMinionClusterIntegrationTest.java | 139 +++++++++++++++++++++ 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 308ba5ca63..522cc5e077 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -144,6 +144,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } String minionInstanceTag = taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); if (taskState != null) { @@ -175,9 +177,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { throw new UnknownTaskTypeException( "Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName); } - _helixTaskResourceManager.ensureTaskQueueExists(taskType); - addTaskTypeMetricsUpdaterIfNeeded(taskType); - // responseMap holds the table to task name mapping. Map<String, String> responseMap = new HashMap<>(); for (String tableNameWithType : tableNameWithTypes) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java new file mode 100644 index 0000000000..374bb2d617 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.task.AdhocTaskConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class SegmentGenerationMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class); + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); + + startZk(); + startController(); + startBroker(); + startServer(); + startMinion(); + } + + @AfterClass + public void tearDown() { + try { + stopMinion(); + stopServer(); + stopBroker(); + stopController(); + stopZk(); + } finally { + FileUtils.deleteQuietly(_tempDir); + } + } + + @Test + public void testAdhocSegmentGenerationAndPushTask() + throws Exception { + String tableName = "myTable"; + String tableNameWithType = tableName + "_OFFLINE"; + addSchemaAndTableConfig(tableName); + + File inputDir = new File(_tempDir, tableName); + int rowCnt = prepInputFiles(inputDir, 7, 10); + assertEquals(rowCnt, 70); + + Map<String, String> taskConfigs = new HashMap<>(); + taskConfigs.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.getAbsolutePath()); + taskConfigs.put(BatchConfigProperties.INPUT_FORMAT, "csv"); + AdhocTaskConfig adhocTaskConfig = + new AdhocTaskConfig("SegmentGenerationAndPushTask", tableNameWithType, null, taskConfigs); + + String url = _controllerBaseApiUrl + "/tasks/execute"; + TestUtils.waitForCondition(aVoid -> { + try { + if (getTotalDocs(tableName) < rowCnt) { + // To avoid the NoTaskScheduledException after all files are ingested. + sendPostRequest(url, JsonUtils.objectToString(adhocTaskConfig), + Collections.singletonMap("accept", "application/json")); + } + return getTotalDocs(tableName) == rowCnt; + } catch (Exception e) { + LOGGER.error("Failed to get expected totalDocs: " + rowCnt, e); + return false; + } + }, 5000L, 600_000L, "Failed to load " + rowCnt + " documents", true); + JsonNode result = postQuery("SELECT COUNT(*) FROM " + tableName, _brokerBaseApiUrl); + // One segment per file. + assertEquals(result.get("numSegmentsQueried").asInt(), 7); + } + + private void addSchemaAndTableConfig(String tableName) + throws Exception { + addSchema(new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("id", FieldSpec.DataType.INT) + .addSingleValueDimension("name", FieldSpec.DataType.STRING).build()); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build(); + sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toString(), + BasicAuthTestUtils.AUTH_HEADER); + } + + private int prepInputFiles(File inputDir, int fileNum, int rowsPerFile) + throws Exception { + int rowCnt = 0; + for (int i = 0; i < fileNum; i++) { + File csvFile = new File(inputDir, String.format("tempFile_%05d.csv", i)); + FileUtils.write(csvFile, "id,name\n", false); + for (int j = 0; j < rowsPerFile; j++) { + FileUtils.write(csvFile, String.format("%d,n%d\n", rowCnt, rowCnt), true); + rowCnt++; + } + } + return rowCnt; + } + + private int getTotalDocs(String tableName) + throws Exception { + String query = "SELECT COUNT(*) FROM " + tableName; + JsonNode response = postQuery(query, _brokerBaseApiUrl); + JsonNode resTbl = response.get("resultTable"); + return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org