fx19880617 commented on a change in pull request #6352:
URL: https://github.com/apache/incubator-pinot/pull/6352#discussion_r543958481



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
##########
@@ -63,99 +65,147 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
   }
 
   /**
-   * Returns the cluster info provider.
-   * <p>
-   * Cluster info provider might be useful when initializing task generators.
-   *
-   * @return Cluster info provider
+   * Returns the cluster info accessor.
+   * <p>Cluster info accessor can be used to initialize the task generator.
    */
   public ClusterInfoAccessor getClusterInfoAccessor() {
     return _clusterInfoAccessor;
   }
 
   /**
    * Registers a task generator.
-   * <p>
-   * This method can be used to plug in custom task generators.
-   *
-   * @param pinotTaskGenerator Task generator to be registered
+   * <p>This method can be used to plug in custom task generators.
    */
-  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
-    _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
+  public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
+    _taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
   }
 
   /**
-   * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
+   * Public API to schedule tasks (all task types) for all tables. It might be 
called from the non-leader controller.
+   * Returns a map from the task type to the task scheduled.
    */
   public synchronized Map<String, String> scheduleTasks() {
-    Map<String, String> tasksScheduled = 
scheduleTasks(_pinotHelixResourceManager.getAllTables());
-
-    // Reset the task because this method will be called from the Rest API 
instead of the periodic task scheduler
-    // TODO: Clean up only the non-leader tables instead of all tables
-    cleanUpTask();
-    setUpTask();
-
-    return tasksScheduled;
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);

Review comment:
       why this is always false?

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
##########
@@ -48,10 +48,7 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;

Review comment:
       expand the imports

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
##########
@@ -63,99 +65,147 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
   }
 
   /**
-   * Returns the cluster info provider.
-   * <p>
-   * Cluster info provider might be useful when initializing task generators.
-   *
-   * @return Cluster info provider
+   * Returns the cluster info accessor.
+   * <p>Cluster info accessor can be used to initialize the task generator.
    */
   public ClusterInfoAccessor getClusterInfoAccessor() {
     return _clusterInfoAccessor;
   }
 
   /**
    * Registers a task generator.
-   * <p>
-   * This method can be used to plug in custom task generators.
-   *
-   * @param pinotTaskGenerator Task generator to be registered
+   * <p>This method can be used to plug in custom task generators.
    */
-  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
-    _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
+  public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
+    _taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
   }
 
   /**
-   * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
+   * Public API to schedule tasks (all task types) for all tables. It might be 
called from the non-leader controller.
+   * Returns a map from the task type to the task scheduled.
    */
   public synchronized Map<String, String> scheduleTasks() {
-    Map<String, String> tasksScheduled = 
scheduleTasks(_pinotHelixResourceManager.getAllTables());
-
-    // Reset the task because this method will be called from the Rest API 
instead of the periodic task scheduler
-    // TODO: Clean up only the non-leader tables instead of all tables
-    cleanUpTask();
-    setUpTask();
-
-    return tasksScheduled;
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks for the given 
tables.
-   *
-   * @param tableNamesWithType List of table names with type suffix
-   * @return Map from task type to task scheduled
+   * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled. Returns a map
+   * from the task type to the task scheduled.
    */
-  private synchronized Map<String, String> scheduleTasks(List<String> 
tableNamesWithType) {
+  private synchronized Map<String, String> scheduleTasks(List<String> 
tableNamesWithType, boolean isLeader) {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
     Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    int numTaskTypes = taskTypes.size();
-    Map<String, List<TableConfig>> enabledTableConfigMap = new 
HashMap<>(numTaskTypes);
-
-    for (String taskType : taskTypes) {
-      enabledTableConfigMap.put(taskType, new ArrayList<>());
-
-      // Ensure all task queues exist
-      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    }
+    Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
 
     // Scan all table configs to get the tables with tasks enabled
     for (String tableNameWithType : tableNamesWithType) {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null) {
-        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-        if (taskConfig != null) {
-          for (String taskType : taskTypes) {
-            if (taskConfig.isTaskTypeEnabled(taskType)) {
-              enabledTableConfigMap.get(taskType).add(tableConfig);
-            }
+      if (tableConfig != null && tableConfig.getTaskConfig() != null) {
+        Set<String> enabledTaskTypes = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
+        for (String enabledTaskType : enabledTaskTypes) {
+          if (taskTypes.contains(enabledTaskType)) {
+            enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new 
ArrayList<>()).add(tableConfig);
+          } else {
+            LOGGER.warn("Task type: {} is not registered, cannot enable it for 
table: {}", enabledTaskType,

Review comment:
       Is it possible that we return this info to the client-side? I feel this 
information is useful for users to test and debug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to