This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9546c533a1 Prevent task generation when task queue is in STOPPED state (#14257) 9546c533a1 is described below commit 9546c533a11f2700ab5e510760750b1869e9f87b Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Wed Oct 23 10:20:25 2024 -0700 Prevent task generation when task queue is in STOPPED state (#14257) --- .../helix/core/minion/PinotTaskManager.java | 29 ++++-- .../core/minion/PinotTaskManagerStatelessTest.java | 101 +++++++++++++++++++++ 2 files changed, 124 insertions(+), 6 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index f230c5ecce..94facbc377 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -163,6 +164,9 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { taskConfigs.getOrDefault(MINION_INSTANCE_TAG_CONFIG, CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); + if (!isTaskSchedulable(taskType, List.of(tableName))) { + return new HashMap<>(); + } String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); if (taskState != null) { @@ -566,15 +570,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { String taskType = entry.getKey(); List<TableConfig> enabledTableConfigs = entry.getValue(); PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + List<String> enabledTables = + enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList()); if (taskGenerator != null) { _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag)); } else { - List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size()); - for (TableConfig enabledTableConfig : enabledTableConfigs) { - enabledTables.add(enabledTableConfig.getTableName()); - } LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables); tasksScheduled.put(taskType, null); } @@ -611,9 +613,14 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { @Nullable private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, boolean isLeader, @Nullable String minionInstanceTagForTask) { - LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader); - Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>(); String taskType = taskGenerator.getTaskType(); + List<String> enabledTables = + enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList()); + LOGGER.info("Trying to schedule task type: {}, for tables: {}, isLeader: {}", taskType, enabledTables, isLeader); + if (!isTaskSchedulable(taskType, enabledTables)) { + return null; + } + Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>(); for (TableConfig tableConfig : enabledTableConfigs) { String tableName = tableConfig.getTableName(); try { @@ -745,4 +752,14 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { _taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater); } } + + private boolean isTaskSchedulable(String taskType, List<String> tables) { + TaskState taskQueueState = _helixTaskResourceManager.getTaskQueueState(taskType); + if (TaskState.STOPPED.equals(taskQueueState) || TaskState.STOPPING.equals(taskQueueState)) { + LOGGER.warn("Task queue is in state: {}. Tasks won't be created for taskType: {} and tables: {}. Resume task " + + "queue before attempting to create tasks.", taskQueueState.name(), taskType, tables); + return false; + } + return true; + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java index 4abc8bdaa6..e4405502ef 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java @@ -19,14 +19,19 @@ package org.apache.pinot.controller.helix.core.minion; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; +import org.apache.helix.task.TaskState; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; @@ -83,6 +88,7 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { startController(properties); addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); addFakeServerInstancesToAutoJoinHelixCluster(1, true); + addFakeMinionInstancesToAutoJoinHelixCluster(1); Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) .addSingleValueDimension("myMap", FieldSpec.DataType.STRING) .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING) @@ -109,6 +115,101 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { stopController(); } + private void testValidateTaskGeneration(Function<PinotTaskManager, Void> validateFunction) + throws Exception { + Map<String, Object> properties = getDefaultControllerConfiguration(); + properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true); + startController(properties); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeServerInstancesToAutoJoinHelixCluster(1, true); + addFakeMinionInstancesToAutoJoinHelixCluster(1); + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension("myMap", FieldSpec.DataType.STRING) + .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING) + .addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build(); + addSchema(schema); + PinotTaskManager taskManager = _controllerStarter.getTaskManager(); + Scheduler scheduler = taskManager.getScheduler(); + assertNotNull(scheduler); + + String segmentGenerationAndPushTask = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE; + + // Add Table + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig( + ImmutableMap.of(segmentGenerationAndPushTask, ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build(); + waitForEVToDisappear(tableConfig.getTableName()); + addTableConfig(tableConfig); + waitForJobGroupNames(taskManager, jgn -> jgn.size() == 1 && jgn.contains(segmentGenerationAndPushTask), + "JobGroupNames should have SegmentGenerationAndPushTask only"); + validateJob(segmentGenerationAndPushTask, "0 */10 * ? * * *"); + + // Ensure task queue exists + PinotHelixTaskResourceManager taskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + taskResourceManager.ensureTaskQueueExists(segmentGenerationAndPushTask); + + // Register the task generator + taskManager.registerTaskGenerator(new BaseTaskGenerator() { + @Override + public String getTaskType() { + return segmentGenerationAndPushTask; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + // test validates that this method never gets called as the task queue is in stopped state + return List.of(new PinotTaskConfig(segmentGenerationAndPushTask, new HashMap<>())); + } + }); + + // Stop the task queue + taskResourceManager.stopTaskQueue(segmentGenerationAndPushTask); + + // Assert the task queue state + TestUtils.waitForCondition(aVoid -> { + TaskState taskQueueState = taskResourceManager.getTaskQueueState(segmentGenerationAndPushTask); + return TaskState.STOPPED.equals(taskQueueState); + }, TIMEOUT_IN_MS, "task queue state was not in STOPPED state within ten seconds."); + + // Exercise the test + validateFunction.apply(taskManager); + + // Drop table + dropOfflineTable(RAW_TABLE_NAME); + waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty"); + + stopFakeInstances(); + stopController(); + } + + @Test + public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue() + throws Exception { + testValidateTaskGeneration(taskManager -> { + // Validate schedule tasks for table when task queue is in stopped state + List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null); + assertNull(taskIDs); + return null; + }); + } + + @Test + public void testPinotTaskManagerCreateTaskWithStoppedTaskQueue() + throws Exception { + testValidateTaskGeneration(taskManager -> { + Map<String, String> taskMap; + try { + // Validate task creation for table when task queue is in stopped state + taskMap = taskManager.createTask("SegmentGenerationAndPushTask", "myTable", "myTaskName", new HashMap<>()); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertNotNull(taskMap); + assertEquals(taskMap.size(), 0); + return null; + }); + } + @Test public void testPinotTaskManagerSchedulerWithUpdate() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org