This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d206f127f3 Allowing users to pass minionInstanceTag as a param in 
/tasks/schedule API (#12786)
d206f127f3 is described below

commit d206f127f33f2843c288b483e16e22963dac6b4f
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Thu Apr 18 03:44:20 2024 +0530

    Allowing users to pass minionInstanceTag as a param in /tasks/schedule API 
(#12786)
    
    * Allowing users to pass minionInstanceTag in tasks/schedule API
    
    * add nullable annotation
---
 .../api/resources/PinotTaskRestletResource.java    | 11 +++--
 .../helix/core/minion/PinotTaskManager.java        | 48 ++++++++++++++--------
 .../tests/SimpleMinionClusterIntegrationTest.java  |  4 +-
 3 files changed, 40 insertions(+), 23 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index c51c266587..e09bde8466 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,19 +618,22 @@ public class PinotTaskRestletResource {
   @ApiOperation("Schedule tasks and return a map from task type to task name 
scheduled")
   public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") 
@QueryParam("taskType") String taskType,
       @ApiParam(value = "Table name (with type suffix)") 
@QueryParam("tableName") String tableName,
+      @ApiParam(value = "Minion Instance tag to schedule the task explicitly 
on")
+      @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
       @Context HttpHeaders headers) {
     String database = headers != null ? headers.getHeaderString(DATABASE) : 
DEFAULT_DATABASE;
     if (taskType != null) {
       // Schedule task for the given task type
       List<String> taskNames = tableName != null
-          ? _pinotTaskManager.scheduleTask(taskType, 
DatabaseUtils.translateTableName(tableName, headers))
-          : _pinotTaskManager.scheduleTaskForDatabase(taskType, database);
+          ? _pinotTaskManager.scheduleTask(taskType,
+          DatabaseUtils.translateTableName(tableName, headers), 
minionInstanceTag)
+          : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, 
minionInstanceTag);
       return Collections.singletonMap(taskType, taskNames == null ? null : 
StringUtils.join(taskNames, ','));
     } else {
       // Schedule tasks for all task types
       Map<String, List<String>> allTaskNames = tableName != null
-          ? 
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, 
headers))
-          : _pinotTaskManager.scheduleTasksForDatabase(database);
+          ? 
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, 
headers), minionInstanceTag)
+          : _pinotTaskManager.scheduleTasksForDatabase(database, 
minionInstanceTag);
       return allTaskNames.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
String.join(",", entry.getValue())));
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 2cdbf8c1df..4029944139 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -485,7 +485,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * Returns a map from the task type to the list of tasks scheduled.
    */
   public synchronized Map<String, List<String>> scheduleTasks() {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, 
null);
   }
 
   /**
@@ -493,15 +493,17 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> 
scheduleTasksForDatabase(@Nullable String database) {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), 
false);
+  public synchronized Map<String, List<String>> 
scheduleTasksForDatabase(@Nullable String database,
+      @Nullable String minionInstanceTag) {
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), 
false, minionInstanceTag);
   }
 
   /**
    * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled. Returns a map
    * from the task type to the list of the tasks scheduled.
    */
-  private synchronized Map<String, List<String>> scheduleTasks(List<String> 
tableNamesWithType, boolean isLeader) {
+  private synchronized Map<String, List<String>> scheduleTasks(List<String> 
tableNamesWithType,
+      boolean isLeader, @Nullable String minionInstanceTag) {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
     // Scan all table configs to get the tables with tasks enabled
@@ -525,7 +527,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       if (taskGenerator != null) {
         _helixTaskResourceManager.ensureTaskQueueExists(taskType);
         addTaskTypeMetricsUpdaterIfNeeded(taskType);
-        tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, isLeader));
+        tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, isLeader, minionInstanceTag));
       } else {
         List<String> enabledTables = new 
ArrayList<>(enabledTableConfigs.size());
         for (TableConfig enabledTableConfig : enabledTableConfigs) {
@@ -545,14 +547,15 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    */
   @Nullable
   private List<String> scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
-      boolean isLeader) {
+      boolean isLeader, @Nullable String minionInstanceTagForTask) {
     LOGGER.info("Trying to schedule task type: {}, isLeader: {}", 
taskGenerator.getTaskType(), isLeader);
     Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new 
HashMap<>();
     String taskType = taskGenerator.getTaskType();
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
-        String minionInstanceTag = 
taskGenerator.getMinionInstanceTag(tableConfig);
+        String minionInstanceTag = minionInstanceTagForTask != null
+            ? minionInstanceTagForTask : 
taskGenerator.getMinionInstanceTag(tableConfig);
         List<PinotTaskConfig> presentTaskConfig =
             minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, 
k -> new ArrayList<>());
         taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -624,7 +627,16 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * controller. Returns a map from the task type to the list of tasks 
scheduled.
    */
   public synchronized Map<String, List<String>> scheduleTasks(String 
tableNameWithType) {
-    return scheduleTasks(Collections.singletonList(tableNameWithType), false);
+    return scheduleTasks(Collections.singletonList(tableNameWithType), false, 
null);
+  }
+
+  /**
+   * Public API to schedule tasks (all task types) for the given table on a 
specific instance tag.
+   * It might be called from the non-leader controller. Returns a map from the 
task type to the list of tasks scheduled.
+   */
+  public synchronized Map<String, List<String>> scheduleTasks(String 
tableNameWithType,
+      @Nullable String minionInstanceTag) {
+    return scheduleTasks(Collections.singletonList(tableNameWithType), false, 
minionInstanceTag);
   }
 
   /**
@@ -633,8 +645,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * Returns the list of task names, or {@code null} if no task is scheduled.
    */
   @Nullable
-  public synchronized List<String> scheduleTask(String taskType) {
-    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables());
+  public synchronized List<String> scheduleTask(String taskType, @Nullable 
String minionInstanceTag) {
+    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), 
minionInstanceTag);
   }
 
   /**
@@ -643,12 +655,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * Returns the list of task name, or {@code null} if no task is scheduled.
    */
   @Nullable
-  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
@Nullable String database) {
-    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database));
+  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
@Nullable String database,
+      @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
   }
 
   @Nullable
-  private List<String> scheduleTask(String taskType, List<String> tables) {
+  private List<String> scheduleTask(String taskType, List<String> tables, 
@Nullable String minionInstanceTag) {
     PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
     Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
 
@@ -664,7 +677,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
 
     _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, enabledTableConfigs, false);
+    return scheduleTask(taskGenerator, enabledTableConfigs, false, 
minionInstanceTag);
   }
 
   /**
@@ -672,7 +685,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * controller. Returns the list of task names, or {@code null} if no task is 
scheduled.
    */
   @Nullable
-  public synchronized List<String> scheduleTask(String taskType, String 
tableNameWithType) {
+  public synchronized List<String> scheduleTask(String taskType, String 
tableNameWithType,
+      @Nullable String minionInstanceTag) {
     PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
     Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
 
@@ -685,12 +699,12 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
 
     _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), 
false);
+    return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), 
false, minionInstanceTag);
   }
 
   @Override
   protected void processTables(List<String> tableNamesWithType, Properties 
taskProperties) {
-    scheduleTasks(tableNamesWithType, true);
+    scheduleTasks(tableNamesWithType, true, null);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 1db953f00f..241c1c0876 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     verifyTaskCount(task1.get(0), 0, 1, 1, 2);
     // Should generate one more task, with two sub-tasks. Both of these 
sub-tasks will wait
     // since we have one minion instance that is still running one of the 
sub-tasks.
-    List<String> task2 = _taskManager.scheduleTask(TASK_TYPE);
+    List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
     assertNotNull(task2);
     assertEquals(task2.size(), 1);
     
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -160,7 +160,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     // Our test task generator does not generate if there are already this 
many sub-tasks in the
     // running+waiting count already.
     assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
-    assertNull(_taskManager.scheduleTask(TASK_TYPE));
+    assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to