This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9546c533a1 Prevent task generation when task queue is in STOPPED state 
(#14257)
9546c533a1 is described below

commit 9546c533a11f2700ab5e510760750b1869e9f87b
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Wed Oct 23 10:20:25 2024 -0700

    Prevent task generation when task queue is in STOPPED state (#14257)
---
 .../helix/core/minion/PinotTaskManager.java        |  29 ++++--
 .../core/minion/PinotTaskManagerStatelessTest.java | 101 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 6 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index f230c5ecce..94facbc377 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -163,6 +164,9 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
         taskConfigs.getOrDefault(MINION_INSTANCE_TAG_CONFIG, 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
     _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     addTaskTypeMetricsUpdaterIfNeeded(taskType);
+    if (!isTaskSchedulable(taskType, List.of(tableName))) {
+      return new HashMap<>();
+    }
     String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
     TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
     if (taskState != null) {
@@ -566,15 +570,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       String taskType = entry.getKey();
       List<TableConfig> enabledTableConfigs = entry.getValue();
       PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+      List<String> enabledTables =
+          
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
       if (taskGenerator != null) {
         _helixTaskResourceManager.ensureTaskQueueExists(taskType);
         addTaskTypeMetricsUpdaterIfNeeded(taskType);
         tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, isLeader, minionInstanceTag));
       } else {
-        List<String> enabledTables = new 
ArrayList<>(enabledTableConfigs.size());
-        for (TableConfig enabledTableConfig : enabledTableConfigs) {
-          enabledTables.add(enabledTableConfig.getTableName());
-        }
         LOGGER.warn("Task type: {} is not registered, cannot enable it for 
tables: {}", taskType, enabledTables);
         tasksScheduled.put(taskType, null);
       }
@@ -611,9 +613,14 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   @Nullable
   private List<String> scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
       boolean isLeader, @Nullable String minionInstanceTagForTask) {
-    LOGGER.info("Trying to schedule task type: {}, isLeader: {}", 
taskGenerator.getTaskType(), isLeader);
-    Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new 
HashMap<>();
     String taskType = taskGenerator.getTaskType();
+    List<String> enabledTables =
+        
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
+    LOGGER.info("Trying to schedule task type: {}, for tables: {}, isLeader: 
{}", taskType, enabledTables, isLeader);
+    if (!isTaskSchedulable(taskType, enabledTables)) {
+      return null;
+    }
+    Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new 
HashMap<>();
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
@@ -745,4 +752,14 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       _taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
     }
   }
+
+  private boolean isTaskSchedulable(String taskType, List<String> tables) {
+    TaskState taskQueueState = 
_helixTaskResourceManager.getTaskQueueState(taskType);
+    if (TaskState.STOPPED.equals(taskQueueState) || 
TaskState.STOPPING.equals(taskQueueState)) {
+      LOGGER.warn("Task queue is in state: {}. Tasks won't be created for 
taskType: {} and tables: {}. Resume task "
+          + "queue before attempting to create tasks.", taskQueueState.name(), 
taskType, tables);
+      return false;
+    }
+    return true;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index 4abc8bdaa6..e4405502ef 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -19,14 +19,19 @@
 package org.apache.pinot.controller.helix.core.minion;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.function.Predicate;
+import org.apache.helix.task.TaskState;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -83,6 +88,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     startController(properties);
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeMinionInstancesToAutoJoinHelixCluster(1);
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
         .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
@@ -109,6 +115,101 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     stopController();
   }
 
+  private void testValidateTaskGeneration(Function<PinotTaskManager, Void> 
validateFunction)
+      throws Exception {
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    startController(properties);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeMinionInstancesToAutoJoinHelixCluster(1);
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("complexMapStr", 
FieldSpec.DataType.STRING).build();
+    addSchema(schema);
+    PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+    Scheduler scheduler = taskManager.getScheduler();
+    assertNotNull(scheduler);
+
+    String segmentGenerationAndPushTask = 
MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+
+    // Add Table
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of(segmentGenerationAndPushTask, 
ImmutableMap.of("schedule", "0 */10 * ? * * *")))).build();
+    waitForEVToDisappear(tableConfig.getTableName());
+    addTableConfig(tableConfig);
+    waitForJobGroupNames(taskManager, jgn -> jgn.size() == 1 && 
jgn.contains(segmentGenerationAndPushTask),
+        "JobGroupNames should have SegmentGenerationAndPushTask only");
+    validateJob(segmentGenerationAndPushTask, "0 */10 * ? * * *");
+
+    // Ensure task queue exists
+    PinotHelixTaskResourceManager taskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
+    taskResourceManager.ensureTaskQueueExists(segmentGenerationAndPushTask);
+
+    // Register the task generator
+    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+      @Override
+      public String getTaskType() {
+        return segmentGenerationAndPushTask;
+      }
+
+      @Override
+      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
+        // test validates that this method never gets called as the task queue 
is in stopped state
+        return List.of(new PinotTaskConfig(segmentGenerationAndPushTask, new 
HashMap<>()));
+      }
+    });
+
+    // Stop the task queue
+    taskResourceManager.stopTaskQueue(segmentGenerationAndPushTask);
+
+    // Assert the task queue state
+    TestUtils.waitForCondition(aVoid -> {
+      TaskState taskQueueState = 
taskResourceManager.getTaskQueueState(segmentGenerationAndPushTask);
+      return TaskState.STOPPED.equals(taskQueueState);
+    }, TIMEOUT_IN_MS, "task queue state was not in STOPPED state within ten 
seconds.");
+
+    // Exercise the test
+    validateFunction.apply(taskManager);
+
+    // Drop table
+    dropOfflineTable(RAW_TABLE_NAME);
+    waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, 
"JobGroupNames should be empty");
+
+    stopFakeInstances();
+    stopController();
+  }
+
+  @Test
+  public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
+      throws Exception {
+    testValidateTaskGeneration(taskManager -> {
+      // Validate schedule tasks for table when task queue is in stopped state
+      List<String> taskIDs = 
taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", 
null);
+      assertNull(taskIDs);
+      return null;
+    });
+  }
+
+  @Test
+  public void testPinotTaskManagerCreateTaskWithStoppedTaskQueue()
+      throws Exception {
+    testValidateTaskGeneration(taskManager -> {
+      Map<String, String> taskMap;
+      try {
+        // Validate task creation for table when task queue is in stopped state
+        taskMap = taskManager.createTask("SegmentGenerationAndPushTask", 
"myTable", "myTaskName", new HashMap<>());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      assertNotNull(taskMap);
+      assertEquals(taskMap.size(), 0);
+      return null;
+    });
+  }
+
   @Test
   public void testPinotTaskManagerSchedulerWithUpdate()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to