This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new eb7489c453 Track the actor that triggers the minion task (#14829) eb7489c453 is described below commit eb7489c453e2b82b8199670bbea931e52ae609db Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Thu Feb 13 09:55:11 2025 +0530 Track the actor that triggers the minion task (#14829) * Track the actor that triggered the task * add java docs * simplified TaskSchedulingContext * Track the actor that triggered the task * add java docs * simplified TaskSchedulingContext * variable renames * fixes --- .../api/resources/PinotTaskRestletResource.java | 41 +++--- .../helix/core/minion/CronJobScheduleJob.java | 8 +- .../core/minion/PinotHelixTaskResourceManager.java | 31 +++- .../helix/core/minion/PinotTaskManager.java | 156 ++++++++++++--------- .../helix/core/minion/TaskSchedulingContext.java | 135 ++++++++++++++++++ .../helix/core/minion/TaskSchedulingInfo.java | 56 ++++++++ .../src/main/resources/app/pages/SubTaskDetail.tsx | 3 + .../src/main/resources/app/pages/TaskDetail.tsx | 3 + .../core/minion/PinotTaskManagerStatelessTest.java | 12 +- .../integration/tests/MinionTaskTestUtils.java | 23 ++- .../MergeRollupMinionClusterIntegrationTest.java | 55 +++++--- .../tests/PurgeMinionClusterIntegrationTest.java | 45 ++++-- ...fflineSegmentsMinionClusterIntegrationTest.java | 19 ++- ...RefreshSegmentMinionClusterIntegrationTest.java | 44 ++++-- .../tests/SimpleMinionClusterIntegrationTest.java | 8 +- .../integration/tests/TlsIntegrationTest.java | 3 +- .../tests/UpsertTableIntegrationTest.java | 10 +- .../tests/UrlAuthRealtimeIntegrationTest.java | 3 +- .../apache/pinot/spi/utils/CommonConstants.java | 4 + 19 files changed, 496 insertions(+), 163 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index 24cd444dc5..d76ea4f3a2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -80,12 +81,15 @@ import org.apache.pinot.controller.api.exception.UnknownTaskTypeException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo; import org.apache.pinot.controller.util.CompletionServiceHelper; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.task.AdhocTaskConfig; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.glassfish.grizzly.http.server.Request; import org.glassfish.jersey.server.ManagedAsync; @@ -646,29 +650,26 @@ public class PinotTaskRestletResource { Map<String, String> response = new HashMap<>(); List<String> generationErrors = new ArrayList<>(); List<String> schedulingErrors = new ArrayList<>(); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name()) + .setMinionInstanceTag(minionInstanceTag) + .setLeader(false); if (taskType != null) { - // Schedule task for the given task type - PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null - ? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers), - minionInstanceTag) - : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag); - response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ',')); - generationErrors.addAll(taskInfos.getGenerationErrors()); - schedulingErrors.addAll(taskInfos.getSchedulingErrors()); + context.setTasksToSchedule(Collections.singleton(taskType)); + } + if (tableName != null) { + context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName, headers))); } else { - // Schedule tasks for all task types - Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null - ? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers), - minionInstanceTag) - : _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag); - allTaskInfos.forEach((key, value) -> { - if (value.getScheduledTaskNames() != null) { - response.put(key, String.join(",", value.getScheduledTaskNames())); - } - generationErrors.addAll(value.getGenerationErrors()); - schedulingErrors.addAll(value.getSchedulingErrors()); - }); + context.setDatabasesToSchedule(Collections.singleton(database)); } + Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context); + allTaskInfos.forEach((key, value) -> { + if (value.getScheduledTaskNames() != null) { + response.put(key, String.join(",", value.getScheduledTaskNames())); + } + generationErrors.addAll(value.getGenerationErrors()); + schedulingErrors.addAll(value.getSchedulingErrors()); + }); response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors)); response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors)); return response; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java index f9b250b2bc..8a4751196f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java @@ -18,11 +18,13 @@ */ package org.apache.pinot.controller.helix.core.minion; +import java.util.Collections; import java.util.Date; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.spi.utils.CommonConstants; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -64,8 +66,12 @@ public class CronJobScheduleJob implements Job { ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L); return; } + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(table)) + .setTasksToSchedule(Collections.singleton(taskType)) + .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name()); long jobStartTime = System.currentTimeMillis(); - pinotTaskManager.scheduleTaskForTable(taskType, table, null); + pinotTaskManager.scheduleTasks(context); LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType, jobExecutionContext.getNextFireTime()); pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index 87580c30b0..bbbb3fcee2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -876,6 +876,11 @@ public class PinotHelixTaskResourceManager { if (jobFinishTimeMs > 0) { taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs)); } + String triggeredBy = jobConfig.getTaskConfigMap().values().stream().findFirst() + .map(TaskConfig::getConfigMap) + .map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY)) + .orElse(""); + taskDebugInfo.setTriggeredBy(triggeredBy); Set<Integer> partitionSet = jobContext.getPartitionSet(); TaskCount subtaskCount = new TaskCount(); for (int partition : partitionSet) { @@ -890,6 +895,7 @@ public class PinotHelixTaskResourceManager { String taskIdForPartition = jobContext.getTaskIdForPartition(partition); subtaskDebugInfo.setTaskId(taskIdForPartition); subtaskDebugInfo.setState(partitionState); + subtaskDebugInfo.setTriggeredBy(triggeredBy); long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition); if (subtaskStartTimeMs > 0) { subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs)); @@ -987,7 +993,8 @@ public class PinotHelixTaskResourceManager { return MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore); } - @JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "subtaskInfos"}) + @JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "triggeredBy", + "subtaskInfos"}) @JsonInclude(JsonInclude.Include.NON_NULL) public static class TaskDebugInfo { // Time at which the task (which may have multiple subtasks) got created. @@ -998,6 +1005,7 @@ public class PinotHelixTaskResourceManager { private String _finishTime; private TaskState _taskState; private TaskCount _subtaskCount; + private String _triggeredBy; private List<SubtaskDebugInfo> _subtaskInfos; public TaskDebugInfo() { @@ -1046,6 +1054,15 @@ public class PinotHelixTaskResourceManager { return _taskState; } + public String getTriggeredBy() { + return _triggeredBy; + } + + public TaskDebugInfo setTriggeredBy(String triggeredBy) { + _triggeredBy = triggeredBy; + return this; + } + public TaskCount getSubtaskCount() { return _subtaskCount; } @@ -1055,7 +1072,7 @@ public class PinotHelixTaskResourceManager { } } - @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "taskConfig"}) + @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "triggeredBy", "taskConfig"}) @JsonInclude(JsonInclude.Include.NON_NULL) public static class SubtaskDebugInfo { private String _taskId; @@ -1064,6 +1081,7 @@ public class PinotHelixTaskResourceManager { private String _finishTime; private String _participant; private String _info; + private String _triggeredBy; private PinotTaskConfig _taskConfig; public SubtaskDebugInfo() { @@ -1121,6 +1139,15 @@ public class PinotHelixTaskResourceManager { return _info; } + public String getTriggeredBy() { + return _triggeredBy; + } + + public SubtaskDebugInfo setTriggeredBy(String triggeredBy) { + _triggeredBy = triggeredBy; + return this; + } + public PinotTaskConfig getTaskConfig() { return _taskConfig; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 93002f9100..9f1f938187 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -94,6 +95,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE"; private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context"; + public static final String TRIGGERED_BY = "triggeredBy"; private final PinotHelixTaskResourceManager _helixTaskResourceManager; private final ClusterInfoAccessor _clusterInfoAccessor; @@ -208,6 +210,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { LOGGER.warn("No ad-hoc task generated for task type: {}", taskType); continue; } + pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs() + .computeIfAbsent(TRIGGERED_BY, k -> CommonConstants.TaskTriggers.ADHOC_TRIGGER.name())); LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs); _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1); responseMap.put(tableNameWithType, @@ -488,8 +492,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * It might be called from the non-leader controller. * Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled. */ + @Deprecated(forRemoval = true) public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context); } /** @@ -497,9 +504,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * It might be called from the non-leader controller. * Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled. */ + @Deprecated(forRemoval = true) public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForDatabase(@Nullable String database, @Nullable String minionInstanceTag) { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setDatabasesToSchedule(Collections.singleton(database)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context); } /** @@ -507,9 +518,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * It might be called from the non-leader controller. * Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled. */ + @Deprecated(forRemoval = true) public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForTable(String tableNameWithType, @Nullable String minionInstanceTag) { - return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(tableNameWithType)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context); } /** @@ -521,8 +536,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * - list of task generation errors if any * - list of task scheduling errors if any */ + @Deprecated(forRemoval = true) public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTasksToSchedule(Collections.singleton(taskType)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context).get(taskType); } /** @@ -534,9 +553,14 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * - list of task generation errors if any * - list of task scheduling errors if any */ + @Deprecated(forRemoval = true) public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String taskType, @Nullable String database, @Nullable String minionInstanceTag) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTasksToSchedule(Collections.singleton(taskType)) + .setDatabasesToSchedule(Collections.singleton(database)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context).get(taskType); } /** @@ -548,27 +572,64 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * - list of task generation errors if any * - list of task scheduling errors if any */ + @Deprecated(forRemoval = true) public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, String tableNameWithType, @Nullable String minionInstanceTag) { - return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTasksToSchedule(Collections.singleton(taskType)) + .setTablesToSchedule(Collections.singleton(tableNameWithType)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context).get(taskType); } /** * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. * Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled. */ + @Deprecated(forRemoval = true) protected synchronized Map<String, TaskSchedulingInfo> scheduleTasks(List<String> tableNamesWithType, boolean isLeader, @Nullable String minionInstanceTag) { + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(new HashSet<>(tableNamesWithType)) + .setLeader(isLeader) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context); + } + + /** + * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. + * Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled. + */ + public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); - // Scan all table configs to get the tables with tasks enabled Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(); - for (String tableNameWithType : tableNamesWithType) { + Set<String> targetTables = context.getTablesToSchedule(); + Set<String> targetDatabases = context.getDatabasesToSchedule(); + Set<String> tasksToSchedule = context.getTasksToSchedule(); + Set<String> consolidatedTables = new HashSet<>(); + if (targetTables != null) { + consolidatedTables.addAll(targetTables); + } + if (targetDatabases != null) { + targetDatabases.forEach(database -> + consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database))); + } + for (String tableNameWithType : consolidatedTables.isEmpty() + ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); if (tableConfig != null && tableConfig.getTaskConfig() != null) { Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet(); - for (String enabledTaskType : enabledTaskTypes) { - enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList<>()).add(tableConfig); + Set<String> validTasks; + if (tasksToSchedule == null || tasksToSchedule.isEmpty()) { + // if no specific task types are provided schedule for all tasks + validTasks = enabledTaskTypes; + } else { + validTasks = new HashSet<>(tasksToSchedule); + validTasks.retainAll(enabledTaskTypes); + } + for (String taskType : validTasks) { + enabledTableConfigMap.computeIfAbsent(taskType, k -> new ArrayList<>()).add(tableConfig); } } } @@ -579,13 +640,14 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { String taskType = entry.getKey(); List<TableConfig> enabledTableConfigs = entry.getValue(); PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); - List<String> enabledTables = - enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList()); if (taskGenerator != null) { _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); - tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag)); + tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, context.isLeader(), + context.getMinionInstanceTag(), context.getTriggeredBy())); } else { + List<String> enabledTables = + enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList()); String message = "Task type: " + taskType + " is not registered, cannot enable it for tables: " + enabledTables; LOGGER.warn(message); TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo(); @@ -597,24 +659,16 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { return tasksScheduled; } + @Deprecated(forRemoval = true) protected synchronized TaskSchedulingInfo scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) { - PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); - Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); - - // Scan all table configs to get the tables with task enabled - List<TableConfig> enabledTableConfigs = new ArrayList<>(); - for (String tableNameWithType : tables) { - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig() - .isTaskTypeEnabled(taskType)) { - enabledTableConfigs.add(tableConfig); - } - } - - _helixTaskResourceManager.ensureTaskQueueExists(taskType); - addTaskTypeMetricsUpdaterIfNeeded(taskType); - return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag); + Preconditions.checkState(_taskGeneratorRegistry.getAllTaskTypes().contains(taskType), + "Task type: %s is not registered", taskType); + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(new HashSet<>(tables)) + .setTasksToSchedule(Collections.singleton(taskType)) + .setMinionInstanceTag(minionInstanceTag); + return scheduleTasks(context).get(taskType); } /** @@ -626,8 +680,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * - list of task scheduling errors if any */ protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, - boolean isLeader, @Nullable String minionInstanceTagForTask) { - TaskSchedulingInfo response = new TaskSchedulingInfo(); + boolean isLeader, @Nullable String minionInstanceTagForTask, String triggeredBy) { + TaskSchedulingInfo response = new TaskSchedulingInfo(); String taskType = taskGenerator.getTaskType(); List<String> enabledTables = enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList()); @@ -693,6 +747,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { // This might lead to lot of logs, maybe sum it up and move outside the loop LOGGER.info("Submitting {} tasks for task type: {} to minionInstance: {} with task configs: {}", numTasks, taskType, minionInstanceTag, pinotTaskConfigs); + pinotTaskConfigs.forEach(pinotTaskConfig -> + pinotTaskConfig.getConfigs().computeIfAbsent(TRIGGERED_BY, k -> triggeredBy)); String submittedTaskName = _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag, taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(), taskGenerator.getMaxAttemptsPerTask()); @@ -718,7 +774,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { @Override protected void processTables(List<String> tableNamesWithType, Properties taskProperties) { - scheduleTasks(tableNamesWithType, true, null); + TaskSchedulingContext context = new TaskSchedulingContext() + .setLeader(true) + .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name()); + // cron schedule + scheduleTasks(context); } @Override @@ -781,36 +841,4 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } return true; } - - public static class TaskSchedulingInfo { - private List<String> _scheduledTaskNames; - private final List<String> _generationErrors = new ArrayList<>(); - private final List<String> _schedulingErrors = new ArrayList<>(); - - @Nullable - public List<String> getScheduledTaskNames() { - return _scheduledTaskNames; - } - - public TaskSchedulingInfo setScheduledTaskNames(List<String> scheduledTaskNames) { - _scheduledTaskNames = scheduledTaskNames; - return this; - } - - public List<String> getGenerationErrors() { - return _generationErrors; - } - - public void addGenerationError(String generationError) { - _generationErrors.add(generationError); - } - - public List<String> getSchedulingErrors() { - return _schedulingErrors; - } - - public void addSchedulingError(String schedulingError) { - _schedulingErrors.add(schedulingError); - } - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java new file mode 100644 index 0000000000..2a685c2ab7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import java.util.Set; + + +/** + * Wrapper class to manage all the inputs passed to schedule a task on minion. + * Tasks will be scheduled based on the combination on tables, databases and taskTypes passed + * <p>Y -> contains elements + * <p>N -> is null or empty + * <table> + * <tr> + * <th> tablesToSchedule </th> <th> databasesToSchedule </th> <th> tasksToSchedule </th> + * <th> {@link PinotTaskManager} behavior </th> + * </tr> + * <tr> + * <td> N </td> <td> N </td> <td> N </td> + * <td>schedule all the configured tasks on all tables</td> + * </tr> + * <tr> + * <td> Y </td> <td> N </td> <td> N </td> + * <td>schedule all the configured tasks on tables in tablesToSchedule</td> + * </tr> + * <tr> + * <td> N </td> <td> Y </td> <td> N </td> + * <td>schedule all the configured tasks on all tables under the databases in databasesToSchedule</td> + * </tr> + * <tr> + * <td> N </td> <td> N </td> <td> Y </td> + * <td>schedule tasksToSchedule on all tables</td> + * </tr> + * <tr> + * <td> N </td> <td> Y </td> <td> Y </td> + * <td>schedule tasksToSchedule on all tables under the databases in databasesToSchedule</td> + * </tr> + * <tr> + * <td> Y </td> <td> N </td> <td> Y </td> + * <td>schedule tasksToSchedule on tables in tablesToSchedule</td> + * </tr> + * <tr> + * <td> Y </td> <td> Y </td> <td> N </td> + * <td>schedule all the configured tasks on tables in tablesToSchedule + * and also on all tables under the databases in databasesToSchedule</td> + * </tr> + * <tr> + * <td> Y </td> <td> Y </td> <td> Y </td> + * <td>schedule tasksToSchedule on tables in tablesToSchedule and also + * on all tables under the databases in databasesToSchedule</td> + * </tr> + * </table> + * + * In short empty tasksToSchedule will schedule tasks for all types and + * empty tablesToSchedule and databasesToSchedule will schedule tasks for all tables + * + */ +public class TaskSchedulingContext { + private Set<String> _tablesToSchedule; + private Set<String> _tasksToSchedule; + private Set<String> _databasesToSchedule; + private String _triggeredBy; + private String _minionInstanceTag; + private boolean _isLeader; + + public Set<String> getTablesToSchedule() { + return _tablesToSchedule; + } + + public TaskSchedulingContext setTablesToSchedule(Set<String> tablesToSchedule) { + _tablesToSchedule = tablesToSchedule; + return this; + } + + public Set<String> getTasksToSchedule() { + return _tasksToSchedule; + } + + public TaskSchedulingContext setTasksToSchedule(Set<String> tasksToSchedule) { + _tasksToSchedule = tasksToSchedule; + return this; + } + + public Set<String> getDatabasesToSchedule() { + return _databasesToSchedule; + } + + public TaskSchedulingContext setDatabasesToSchedule(Set<String> databasesToSchedule) { + _databasesToSchedule = databasesToSchedule; + return this; + } + + public String getTriggeredBy() { + return _triggeredBy; + } + + public TaskSchedulingContext setTriggeredBy(String triggeredBy) { + _triggeredBy = triggeredBy; + return this; + } + + public String getMinionInstanceTag() { + return _minionInstanceTag; + } + + public TaskSchedulingContext setMinionInstanceTag(String minionInstanceTag) { + _minionInstanceTag = minionInstanceTag; + return this; + } + + public boolean isLeader() { + return _isLeader; + } + + public TaskSchedulingContext setLeader(boolean leader) { + _isLeader = leader; + return this; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java new file mode 100644 index 0000000000..2ffa11676f --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + + +public class TaskSchedulingInfo { + private List<String> _scheduledTaskNames; + private final List<String> _generationErrors = new ArrayList<>(); + private final List<String> _schedulingErrors = new ArrayList<>(); + + @Nullable + public List<String> getScheduledTaskNames() { + return _scheduledTaskNames; + } + + public TaskSchedulingInfo setScheduledTaskNames(List<String> scheduledTaskNames) { + _scheduledTaskNames = scheduledTaskNames; + return this; + } + + public List<String> getGenerationErrors() { + return _generationErrors; + } + + public void addGenerationError(String generationError) { + _generationErrors.add(generationError); + } + + public List<String> getSchedulingErrors() { + return _schedulingErrors; + } + + public void addSchedulingError(String schedulingError) { + _schedulingErrors.add(schedulingError); + } +} diff --git a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx index f94cd4d820..27ab5dc273 100644 --- a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx +++ b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx @@ -113,6 +113,9 @@ const TaskDetail = (props) => { <Grid item xs={12}> <strong>Finish Time:</strong> {get(taskDebugData, 'finishTime', '')} </Grid> + <Grid item xs={12}> + <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy', '')} + </Grid> <Grid item xs={12}> <strong>Minion Host Name:</strong> {get(taskDebugData, 'participant', '')} </Grid> diff --git a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx index f825cfbd05..eff82fd3fd 100644 --- a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx +++ b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx @@ -124,6 +124,9 @@ const TaskDetail = (props) => { <Grid item xs={12}> <strong>Finish Time:</strong> {get(taskDebugData, 'finishTime', '')} </Grid> + <Grid item xs={12}> + <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy', '')} + </Grid> <Grid item xs={12}> <strong>Number of Sub Tasks:</strong> {get(taskDebugData, 'subtaskCount.total', '')} </Grid> diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java index 132e109796..15b584eabf 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java @@ -65,6 +65,7 @@ import static org.testng.Assert.*; @Test(groups = "stateless") public class PinotTaskManagerStatelessTest extends ControllerTest { private static final String RAW_TABLE_NAME = "myTable"; + public static final String TABLE_NAME_WITH_TYPE = "myTable_OFFLINE"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); private static final long TIMEOUT_IN_MS = 10_000L; private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManagerStatelessTest.class); @@ -192,10 +193,15 @@ public class PinotTaskManagerStatelessTest extends ControllerTest { public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue() throws Exception { testValidateTaskGeneration(taskManager -> { + String taskName = "SegmentGenerationAndPushTask"; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(TABLE_NAME_WITH_TYPE)) + .setTasksToSchedule(Collections.singleton(taskName)); // Validate schedule tasks for table when task queue is in stopped state - List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null) - .getScheduledTaskNames(); - assertNull(taskIDs); + TaskSchedulingInfo info = taskManager.scheduleTasks(context).get(taskName); + assertNotNull(info); + assertNull(info.getScheduledTaskNames()); + assertFalse(info.getSchedulingErrors().isEmpty()); return null; }); } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java index 849a8b8bfd..2291bbb858 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java @@ -20,6 +20,8 @@ package org.apache.pinot.integration.tests; import java.util.Map; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -29,25 +31,18 @@ public class MinionTaskTestUtils { private MinionTaskTestUtils() { } - public static void assertNoTaskSchedule(String tableNameWithType, String taskType, PinotTaskManager taskManager) { - PinotTaskManager.TaskSchedulingInfo info = - taskManager.scheduleAllTasksForTable(tableNameWithType, null).get(taskType); - assertNoTaskSchedule(info); - } - - public static void assertNoTaskSchedule(String taskType, PinotTaskManager taskManager) { - PinotTaskManager.TaskSchedulingInfo info = taskManager.scheduleTaskForAllTables(taskType, null); - assertNoTaskSchedule(info); - } - - public static void assertNoTaskSchedule(PinotTaskManager taskManager) { - Map<String, PinotTaskManager.TaskSchedulingInfo> infoMap = taskManager.scheduleAllTasksForAllTables(null); + public static void assertNoTaskSchedule(TaskSchedulingContext context, PinotTaskManager taskManager) { + Map<String, TaskSchedulingInfo> infoMap = taskManager.scheduleTasks(context); infoMap.forEach((key, value) -> assertNoTaskSchedule(value)); } - public static void assertNoTaskSchedule(PinotTaskManager.TaskSchedulingInfo info) { + public static void assertNoTaskSchedule(TaskSchedulingInfo info) { assertNotNull(info.getScheduledTaskNames()); assertTrue(info.getScheduledTaskNames().isEmpty()); + assertNoTaskErrors(info); + } + + public static void assertNoTaskErrors(TaskSchedulingInfo info) { assertNotNull(info.getGenerationErrors()); assertTrue(info.getGenerationErrors().isEmpty()); assertNotNull(info.getSchedulingErrors()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index b8833d10b1..71a55da67d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; @@ -408,18 +409,20 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat long expectedWatermark = 16000 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)); List<String> taskList; - for (String tasks = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + for (String tasks = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + taskList = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNull(_taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -524,18 +527,20 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat long expectedWatermark = 16000 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)); List<String> taskList; - for (String tasks = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + for (String tasks = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + taskList = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNull(_taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -633,18 +638,20 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat long expectedWatermark = 16050 * 86_400_000L; String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)); List<String> taskList; - for (String tasks = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + for (String tasks = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + taskList = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNull(_taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -785,18 +792,20 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)); List<String> taskList; - for (String tasks = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + for (String tasks = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, null) + taskList = _taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNull(_taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -918,11 +927,13 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat long expectedWatermark = 16000 * 86_400_000L; String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(realtimeTableName)); List<String> taskList; - for (String tasks = taskManager.scheduleAllTasksForTable(realtimeTableName, null) + for (String tasks = taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null) + taskList = taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { // assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); @@ -930,7 +941,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null) + assertNull(taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -1024,17 +1035,19 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1}; String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); int numTasks = 0; + TaskSchedulingContext context = new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(realtimeTableName)); List<String> taskList; - for (String tasks = taskManager.scheduleAllTasksForTable(realtimeTableName, null). - get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null) + for (String tasks = taskManager.scheduleTasks(context) + .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; + taskList = taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { assertTrue(helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null) + assertNull(taskManager.scheduleTasks(context) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); @@ -1066,10 +1079,10 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, TableType.REALTIME, _tarDir5); waitForAllDocsLoaded(600_000L); - for (String tasks = taskManager.scheduleAllTasksForTable(realtimeTableName, null) + for (String tasks = taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); tasks != null; - taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, null) + taskList = taskManager.scheduleTasks(context) .get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(), tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) : null, numTasks++) { waitForTaskToComplete(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java index fed10b9f1b..19459894c7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.MinionContext; import org.apache.pinot.spi.config.table.IndexingConfig; @@ -185,12 +186,16 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // 5. Check the purge process itself by setting an expecting number of rows String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE); - assertNotNull( - _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) + .get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); waitForTaskToComplete(); // Check that metadata contains expected values @@ -200,7 +205,10 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); } // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); // 52 rows with ArrTime = 1 // 115545 totals rows @@ -231,11 +239,16 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE); assertNotNull( - _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); + _taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) + .get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); waitForTaskToComplete(); // Check that metadata contains expected values @@ -247,7 +260,10 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 86400000); } // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); // 52 rows with ArrTime = 1 // 115545 totals rows @@ -279,7 +295,10 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE); // No task should be schedule as the delay is not passed - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Check purge time String purgeTime = @@ -330,11 +349,15 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // schedule purge tasks String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); - assertNotNull( - _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) + .get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.PurgeTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)), + _taskManager); waitForTaskToComplete(); // Check that metadata contains expected values diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index 296c981c18..15829f57b0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils; import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.FieldConfig; @@ -231,13 +232,16 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC long expectedWatermark = _dataSmallestTimeMs + 86400000; for (int i = 0; i < 3; i++) { // Schedule task - assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(_realtimeTableName))) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); assertTrue(_taskResourceManager.getTaskQueues().contains( PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE))); // Should not generate more tasks - MinionTaskTestUtils.assertNoTaskSchedule(_realtimeTableName, - MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(_realtimeTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)), + _taskManager); // Wait at most 600 seconds for all tasks COMPLETED waitForTaskToComplete(expectedWatermark, _realtimeTableName); @@ -283,13 +287,16 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC _taskManager.cleanUpTask(); for (int i = 0; i < 3; i++) { // Schedule task - assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName))) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); assertTrue(_taskResourceManager.getTaskQueues().contains( PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE))); // Should not generate more tasks - MinionTaskTestUtils.assertNoTaskSchedule(_realtimeMetadataTableName, - MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)), + _taskManager); // Wait at most 600 seconds for all tasks COMPLETED waitForTaskToComplete(expectedWatermark, _realtimeMetadataTableName); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java index c14f278cf6..9a0b23bae4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.config.table.FieldConfig; @@ -108,12 +109,15 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg public void testFirstSegmentRefresh() { // This will create the inverted index as we disable inverted index creation during segment push. String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); - assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); @@ -128,7 +132,9 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg } // This should be no-op as nothing changes. - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata @@ -153,12 +159,15 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING); addSchema(schema); - assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); @@ -232,12 +241,15 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg updateTableConfig(tableConfig); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); - assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); @@ -323,12 +335,15 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); - assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); @@ -401,12 +416,15 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg updateTableConfig(tableConfig); - assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); @@ -423,7 +441,9 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg } // This should be no-op as nothing changes. - MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, MinionConstants.RefreshSegmentTask.TASK_TYPE, + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(offlineTableName)) + .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 3071d9c7fb..00dce3341c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.executor.PinotTaskExecutor; @@ -137,7 +138,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Should create the task queues and generate a task in the same minion instance List<String> task1 = - _taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE).getScheduledTaskNames(); + _taskManager.scheduleTasks(new TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames(); assertNotNull(task1); assertEquals(task1.size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() @@ -151,7 +152,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { verifyTaskCount(task1.get(0), 0, 1, 1, 2); // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait // since we have one minion instance that is still running one of the sub-tasks. - List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, null).getScheduledTaskNames(); + List<String> task2 = _taskManager.scheduleTasks(new TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames(); assertNotNull(task2); assertEquals(task2.size(), 1); assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0))); @@ -160,8 +161,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2. // Our test task generator does not generate if there are already this many sub-tasks in the // running+waiting count already. - MinionTaskTestUtils.assertNoTaskSchedule(_taskManager); - MinionTaskTestUtils.assertNoTaskSchedule(TASK_TYPE, _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext(), _taskManager); // Wait at most 60 seconds for all tasks IN_PROGRESS TestUtils.waitForCondition(input -> { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java index e5c35f6dbd..eceafc732c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java @@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.tls.TlsUtils; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.integration.tests.access.CertBasedTlsChannelAccessControlFactory; import org.apache.pinot.spi.config.table.TableConfig; @@ -478,7 +479,7 @@ public class TlsIntegrationTest extends BaseClusterIntegrationTest { Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0); // schedule offline segment generation - Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null)); + Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new TaskSchedulingContext())); // wait for offline segments JsonNode offlineSegments = TestUtils.waitForResult(() -> { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java index 6d965ffae0..3aa15e1fb0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.restlet.resources.ValidDocIdsType; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; @@ -468,7 +469,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTest { sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(tableName)); waitForNumQueriedSegmentsToConverge(tableName, 600_000L, 5, 2); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(realtimeTableName))) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); // 2 segments should be compacted (351 rows -> 1 row; 500 rows -> 2 rows), 1 segment (149 rows) should be deleted @@ -501,7 +503,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTest { // NOTE: When in-memory valid doc ids are used, no need to pause/resume consumption to trigger the snapshot. String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(realtimeTableName))) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); // 1 segment should be compacted (500 rows -> 2 rows) @@ -544,7 +547,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTest { waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5, 2); String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() + .setTablesToSchedule(Collections.singleton(realtimeTableName))) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); // 1 segment should be compacted (351 rows -> 1 rows), 2 segments (500 rows, 151 rows) should be deleted diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java index 22d24115d9..ade04ba12e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory; import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.auth.UrlAuthProvider; import org.apache.pinot.controller.helix.ControllerRequestClient; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; @@ -176,7 +177,7 @@ public class UrlAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest { Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0); // schedule offline segment generation - Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null)); + Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new TaskSchedulingContext())); // wait for offline segments List<String> offlineSegments = TestUtils.waitForResult(() -> { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 19fbb75216..f1f197bfcd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -79,6 +79,10 @@ public class CommonConstants { CONSUMING, NOT_CONSUMING // In error state } + public enum TaskTriggers { + CRON_TRIGGER, MANUAL_TRIGGER, ADHOC_TRIGGER, UNKNOWN + } + public static class Table { public static final String PUSH_FREQUENCY_HOURLY = "hourly"; public static final String PUSH_FREQUENCY_DAILY = "daily"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org