This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d206f127f3 Allowing users to pass minionInstanceTag as a param in /tasks/schedule API (#12786) d206f127f3 is described below commit d206f127f33f2843c288b483e16e22963dac6b4f Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Thu Apr 18 03:44:20 2024 +0530 Allowing users to pass minionInstanceTag as a param in /tasks/schedule API (#12786) * Allowing users to pass minionInstanceTag in tasks/schedule API * add nullable annotation --- .../api/resources/PinotTaskRestletResource.java | 11 +++-- .../helix/core/minion/PinotTaskManager.java | 48 ++++++++++++++-------- .../tests/SimpleMinionClusterIntegrationTest.java | 4 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index c51c266587..e09bde8466 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -618,19 +618,22 @@ public class PinotTaskRestletResource { @ApiOperation("Schedule tasks and return a map from task type to task name scheduled") public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName, + @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") + @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag, @Context HttpHeaders headers) { String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE; if (taskType != null) { // Schedule task for the given task type List<String> taskNames = tableName != null - ? _pinotTaskManager.scheduleTask(taskType, DatabaseUtils.translateTableName(tableName, headers)) - : _pinotTaskManager.scheduleTaskForDatabase(taskType, database); + ? _pinotTaskManager.scheduleTask(taskType, + DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) + : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag); return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ',')); } else { // Schedule tasks for all task types Map<String, List<String>> allTaskNames = tableName != null - ? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers)) - : _pinotTaskManager.scheduleTasksForDatabase(database); + ? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) + : _pinotTaskManager.scheduleTasksForDatabase(database, minionInstanceTag); return allTaskNames.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue()))); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 2cdbf8c1df..4029944139 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -485,7 +485,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * Returns a map from the task type to the list of tasks scheduled. */ public synchronized Map<String, List<String>> scheduleTasks() { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false); + return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, null); } /** @@ -493,15 +493,17 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * It might be called from the non-leader controller. * Returns a map from the task type to the list of tasks scheduled. */ - public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database) { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false); + public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database, + @Nullable String minionInstanceTag) { + return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag); } /** * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map * from the task type to the list of the tasks scheduled. */ - private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) { + private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, + boolean isLeader, @Nullable String minionInstanceTag) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); // Scan all table configs to get the tables with tasks enabled @@ -525,7 +527,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { if (taskGenerator != null) { _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); - tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader)); + tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag)); } else { List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size()); for (TableConfig enabledTableConfig : enabledTableConfigs) { @@ -545,14 +547,15 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { */ @Nullable private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, - boolean isLeader) { + boolean isLeader, @Nullable String minionInstanceTagForTask) { LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader); Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>(); String taskType = taskGenerator.getTaskType(); for (TableConfig tableConfig : enabledTableConfigs) { String tableName = tableConfig.getTableName(); try { - String minionInstanceTag = taskGenerator.getMinionInstanceTag(tableConfig); + String minionInstanceTag = minionInstanceTagForTask != null + ? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig); List<PinotTaskConfig> presentTaskConfig = minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>()); taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig); @@ -624,7 +627,16 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * controller. Returns a map from the task type to the list of tasks scheduled. */ public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) { - return scheduleTasks(Collections.singletonList(tableNameWithType), false); + return scheduleTasks(Collections.singletonList(tableNameWithType), false, null); + } + + /** + * Public API to schedule tasks (all task types) for the given table on a specific instance tag. + * It might be called from the non-leader controller. Returns a map from the task type to the list of tasks scheduled. + */ + public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType, + @Nullable String minionInstanceTag) { + return scheduleTasks(Collections.singletonList(tableNameWithType), false, minionInstanceTag); } /** @@ -633,8 +645,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * Returns the list of task names, or {@code null} if no task is scheduled. */ @Nullable - public synchronized List<String> scheduleTask(String taskType) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables()); + public synchronized List<String> scheduleTask(String taskType, @Nullable String minionInstanceTag) { + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag); } /** @@ -643,12 +655,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * Returns the list of task name, or {@code null} if no task is scheduled. */ @Nullable - public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database)); + public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database, + @Nullable String minionInstanceTag) { + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag); } @Nullable - private List<String> scheduleTask(String taskType, List<String> tables) { + private List<String> scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); @@ -664,7 +677,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); - return scheduleTask(taskGenerator, enabledTableConfigs, false); + return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag); } /** @@ -672,7 +685,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * controller. Returns the list of task names, or {@code null} if no task is scheduled. */ @Nullable - public synchronized List<String> scheduleTask(String taskType, String tableNameWithType) { + public synchronized List<String> scheduleTask(String taskType, String tableNameWithType, + @Nullable String minionInstanceTag) { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); @@ -685,12 +699,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { _helixTaskResourceManager.ensureTaskQueueExists(taskType); addTaskTypeMetricsUpdaterIfNeeded(taskType); - return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false); + return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false, minionInstanceTag); } @Override protected void processTables(List<String> tableNamesWithType, Properties taskProperties) { - scheduleTasks(tableNamesWithType, true); + scheduleTasks(tableNamesWithType, true, null); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 1db953f00f..241c1c0876 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { verifyTaskCount(task1.get(0), 0, 1, 1, 2); // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait // since we have one minion instance that is still running one of the sub-tasks. - List<String> task2 = _taskManager.scheduleTask(TASK_TYPE); + List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null); assertNotNull(task2); assertEquals(task2.size(), 1); assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0))); @@ -160,7 +160,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Our test task generator does not generate if there are already this many sub-tasks in the // running+waiting count already. assertNull(_taskManager.scheduleTasks().get(TASK_TYPE)); - assertNull(_taskManager.scheduleTask(TASK_TYPE)); + assertNull(_taskManager.scheduleTask(TASK_TYPE, null)); // Wait at most 60 seconds for all tasks IN_PROGRESS TestUtils.waitForCondition(input -> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org