This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f616be74201 Perform task data cleanup as part of table deletion 
(#16307)
f616be74201 is described below

commit f616be74201ea0e68bf01443e2db4aab19a79027
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Wed Jul 16 17:53:17 2025 +0530

    Perform task data cleanup as part of table deletion (#16307)
---
 .../api/resources/PinotTableRestletResource.java   |  83 ++++-
 .../api/resources/TableConfigsRestletResource.java |  26 +-
 .../core/minion/PinotHelixTaskResourceManager.java |   7 +-
 .../api/PinotTableRestletResourceTest.java         | 349 +++++++++++++++------
 .../utils/builder/ControllerRequestURLBuilder.java |  16 +
 5 files changed, 373 insertions(+), 108 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bd97c37d1e5..7f1dae48634 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -72,6 +72,7 @@ import 
org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.TaskState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.RebalanceInProgressException;
@@ -207,8 +208,10 @@ public class PinotTableRestletResource {
   @ManualAuthorization // performed after parsing table configs
   public ConfigSuccessResponse addTable(String tableConfigStr,
       @ApiParam(value = "comma separated list of validation type(s) to skip. 
supported types: (ALL|TASK|UPSERT)")
-      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, 
@Context HttpHeaders httpHeaders,
-      @Context Request request) {
+      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+      @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") 
boolean ignoreActiveTasks,
+      @Context HttpHeaders httpHeaders, @Context Request request)
+      throws IOException {
     // TODO introduce a table config ctor with json string.
     Pair<TableConfig, Map<String, Object>> 
tableConfigAndUnrecognizedProperties;
     TableConfig tableConfig;
@@ -246,6 +249,9 @@ public class PinotTableRestletResource {
       } catch (Exception e) {
         throw new InvalidTableConfigException(e);
       }
+      if (!ignoreActiveTasks) {
+        tableTasksValidation(tableConfig, _pinotHelixTaskResourceManager);
+      }
       _pinotHelixResourceManager.addTable(tableConfig);
       // TODO: validate that table was created successfully
       // (in realtime case, metadata might not have been created but would be 
created successfully in the next run of
@@ -260,6 +266,8 @@ public class PinotTableRestletResource {
         throw new ControllerApplicationException(LOGGER, errStr, 
Response.Status.BAD_REQUEST, e);
       } else if (e instanceof TableAlreadyExistsException) {
         throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.CONFLICT, e);
+      } else if (e instanceof ControllerApplicationException) {
+        throw e;
       } else {
         throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
       }
@@ -426,6 +434,7 @@ public class PinotTableRestletResource {
       @ApiParam(value = "Retention period for the table segments (e.g. 12h, 
3d); If not set, the retention period "
           + "will default to the first config that's not null: the cluster 
setting, then '7d'. Using 0d or -1d will "
           + "instantly delete segments without retention") 
@QueryParam("retention") String retentionPeriod,
+      @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") 
boolean ignoreActiveTasks,
       @Context HttpHeaders headers) {
     TableType tableType = Constants.validateTableType(tableTypeStr);
 
@@ -435,21 +444,25 @@ public class PinotTableRestletResource {
       validateLogicalTableReference(tableName, tableType);
       boolean tableExist = false;
       if (verifyTableType(tableName, tableType, TableType.OFFLINE)) {
+        String tableWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+        tableTasksCleanup(tableWithType, ignoreActiveTasks, 
_pinotHelixResourceManager, _pinotHelixTaskResourceManager);
         tableExist = _pinotHelixResourceManager.hasOfflineTable(tableName);
         // Even the table name does not exist, still go on to delete remaining 
table metadata in case a previous delete
         // did not complete.
         _pinotHelixResourceManager.deleteOfflineTable(tableName, 
retentionPeriod);
         if (tableExist) {
-          
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+          tablesDeleted.add(tableWithType);
         }
       }
       if (verifyTableType(tableName, tableType, TableType.REALTIME)) {
+        String tableWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+        tableTasksCleanup(tableWithType, ignoreActiveTasks, 
_pinotHelixResourceManager, _pinotHelixTaskResourceManager);
         tableExist = _pinotHelixResourceManager.hasRealtimeTable(tableName);
         // Even the table name does not exist, still go on to delete remaining 
table metadata in case a previous delete
         // did not complete.
         _pinotHelixResourceManager.deleteRealtimeTable(tableName, 
retentionPeriod);
         if (tableExist) {
-          
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+          tablesDeleted.add(tableWithType);
         }
       }
       if (!tablesDeleted.isEmpty()) {
@@ -463,6 +476,68 @@ public class PinotTableRestletResource {
         "Table '" + tableName + "' with type " + tableType + " does not 
exist", Response.Status.NOT_FOUND);
   }
 
+  public static void tableTasksValidation(TableConfig tableConfig,
+      PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+    if (tableConfig.getTaskConfig() == null) {
+      return;
+    }
+    String tableWithType = tableConfig.getTableName();
+    Map<String, Map<String, String>> taskTypeConfigsMap = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+    for (String taskType : taskTypeConfigsMap.keySet()) {
+      Map<String, TaskState> taskStates;
+      try {
+        taskStates = 
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+      } catch (IllegalArgumentException e) {
+        LOGGER.info(e.getMessage());
+        return;
+      }
+      if (!taskStates.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER, "The table has 
dangling task data, try performing table "
+            + "delete operation in case the delete operation was not completed 
successfully, else delete the tasks "
+            + "manually through DELETE /tasks/task/{taskName} endpoint. Please 
try again once the dangling tasks are "
+            + "cleaned up", Response.Status.BAD_REQUEST);
+      }
+    }
+  }
+
+  public static void tableTasksCleanup(String tableWithType, boolean 
ignoreActiveTasks,
+      PinotHelixResourceManager pinotHelixResourceManager, 
PinotHelixTaskResourceManager pinotHelixTaskResourceManager)
+      throws IOException {
+    TableConfig tableConfig = 
pinotHelixResourceManager.getTableConfig(tableWithType);
+    if (tableConfig == null || tableConfig.getTaskConfig() == null) {
+      return;
+    }
+    Map<String, Map<String, String>> taskTypeConfigsMap = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+    Set<String> taskTypes = taskTypeConfigsMap.keySet();
+    for (String taskType : taskTypes) {
+      // remove the task schedules to avoid task being scheduled during table 
deletion
+      taskTypeConfigsMap.get(taskType).remove(PinotTaskManager.SCHEDULE_KEY);
+    }
+    pinotHelixResourceManager.updateTableConfig(tableConfig);
+    List<String> pendingTasks = new ArrayList<>();
+    for (String taskType : taskTypes) {
+      Map<String, TaskState> taskStates;
+      try {
+        taskStates = 
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+      } catch (IllegalArgumentException e) {
+        LOGGER.info(e.getMessage());
+        continue;
+      }
+      for (String taskName : taskStates.keySet()) {
+        if (TaskState.IN_PROGRESS.equals(taskStates.get(taskName))) {
+          pendingTasks.add(taskName);
+        } else {
+          pinotHelixTaskResourceManager.deleteTask(taskName, true);
+        }
+      }
+    }
+    if (!ignoreActiveTasks && !pendingTasks.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "The table has " + 
pendingTasks.size() + " active running tasks "
+          + ": " + pendingTasks + ". The task schedules have been cleared, so 
new tasks should not be generated. "
+          + "Please try again once there are no more active tasks", 
Response.Status.BAD_REQUEST);
+    }
+  }
+
   //   Return true iff the table is of the expectedType based on the given 
tableName and tableType. The truth table:
   //        tableType   TableNameBuilder.getTableTypeFromTableName(tableName)  
 Return value
   //     1. null      null (i.e., table has no type suffix)           true
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index 3a15ecb30e8..1a26ddc8265 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -64,6 +64,7 @@ import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
 import org.apache.pinot.controller.util.TaskConfigUtils;
@@ -108,6 +109,9 @@ public class TableConfigsRestletResource {
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
   @Inject
   PinotTaskManager _pinotTaskManager;
 
@@ -192,8 +196,10 @@ public class TableConfigsRestletResource {
   public ConfigSuccessResponse addConfig(
       String tableConfigsStr,
       @ApiParam(value = "comma separated list of validation type(s) to skip. 
supported types: (ALL|TASK|UPSERT)")
-      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, 
@Context HttpHeaders httpHeaders,
-      @Context Request request) {
+      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+      @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") 
boolean ignoreActiveTasks,
+      @Context HttpHeaders httpHeaders, @Context Request request)
+      throws Exception {
     Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps;
     try {
       tableConfigsAndUnrecognizedProps =
@@ -231,9 +237,15 @@ public class TableConfigsRestletResource {
 
       if (offlineTableConfig != null) {
         tuneConfig(offlineTableConfig, schema);
+        if (!ignoreActiveTasks) {
+          PinotTableRestletResource.tableTasksValidation(offlineTableConfig, 
_pinotHelixTaskResourceManager);
+        }
       }
       if (realtimeTableConfig != null) {
         tuneConfig(realtimeTableConfig, schema);
+        if (!ignoreActiveTasks) {
+          PinotTableRestletResource.tableTasksValidation(realtimeTableConfig, 
_pinotHelixTaskResourceManager);
+        }
       }
       try {
         _pinotHelixResourceManager.addSchema(schema, false, false);
@@ -264,6 +276,8 @@ public class TableConfigsRestletResource {
             rawTableName, e.getMessage()), Response.Status.BAD_REQUEST, e);
       } else if (e instanceof TableAlreadyExistsException) {
         throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.CONFLICT, e);
+      } else if (e instanceof ControllerApplicationException) {
+        throw e;
       } else {
         throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
       }
@@ -283,7 +297,9 @@ public class TableConfigsRestletResource {
   @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the 
TableConfigs")
   public SuccessResponse deleteConfig(
       @ApiParam(value = "TableConfigs name i.e. raw table name", required = 
true) @PathParam("tableName")
-      String tableName, @Context HttpHeaders headers) {
+      String tableName,
+      @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") 
boolean ignoreActiveTasks,
+      @Context HttpHeaders headers) {
     try {
       if (TableNameBuilder.isOfflineTableResource(tableName) || 
TableNameBuilder.isRealtimeTableResource(tableName)) {
         throw new ControllerApplicationException(LOGGER, "Invalid table name: 
" + tableName + ". Use raw table name.",
@@ -306,9 +322,13 @@ public class TableConfigsRestletResource {
       boolean tableExists =
           _pinotHelixResourceManager.hasRealtimeTable(tableName) || 
_pinotHelixResourceManager.hasOfflineTable(
               tableName);
+      
PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.REALTIME.tableNameWithType(tableName),
+          ignoreActiveTasks, _pinotHelixResourceManager, 
_pinotHelixTaskResourceManager);
       // Delete whether tables exist or not
       _pinotHelixResourceManager.deleteRealtimeTable(tableName);
       LOGGER.info("Deleted realtime table: {}", tableName);
+      
PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+          ignoreActiveTasks, _pinotHelixResourceManager, 
_pinotHelixTaskResourceManager);
       _pinotHelixResourceManager.deleteOfflineTable(tableName);
       LOGGER.info("Deleted offline table: {}", tableName);
       boolean schemaExists = 
_pinotHelixResourceManager.deleteSchema(tableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e1620f7c935..6f1fb4c0a22 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -524,8 +524,11 @@ public class PinotHelixTaskResourceManager {
    * @return List of child task configs
    */
   public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) 
{
-    Collection<TaskConfig> helixTaskConfigs =
-        
_taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values();
+    JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+    if (jobConfig == null) {
+      return List.of();
+    }
+    Collection<TaskConfig> helixTaskConfigs = 
jobConfig.getTaskConfigMap().values();
     List<PinotTaskConfig> taskConfigs = new 
ArrayList<>(helixTaskConfigs.size());
     for (TaskConfig helixTaskConfig : helixTaskConfigs) {
       taskConfigs.add(PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index e464032ee8f..7af7ad37345 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -29,10 +29,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.task.TaskState;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.core.common.MinionConstants;
@@ -50,18 +55,21 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -76,8 +84,8 @@ import static org.testng.Assert.fail;
 public class PinotTableRestletResourceTest extends ControllerTest {
   private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
   private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
-  private final TableConfigBuilder _offlineBuilder = new 
TableConfigBuilder(TableType.OFFLINE);
-  private final TableConfigBuilder _realtimeBuilder = new 
TableConfigBuilder(TableType.REALTIME);
+  private final TableConfigBuilder _offlineBuilder = 
getOfflineTableBuilder(OFFLINE_TABLE_NAME);
+  private final TableConfigBuilder _realtimeBuilder = 
getRealtimeTableBuilder(REALTIME_TABLE_NAME);
   private String _createTableUrl;
 
   @BeforeClass
@@ -86,13 +94,25 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     DEFAULT_INSTANCE.setupSharedStateAndValidate();
     registerMinionTasks();
     _createTableUrl = 
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate();
-    
_offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
-        .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
+  }
+
+  private TableConfigBuilder getRealtimeTableBuilder(String tableName) {
+    return new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(tableName)
+        .setTimeColumnName("timeColumn")
+        .setTimeType("DAYS")
+        .setRetentionTimeUnit("DAYS")
+        .setRetentionTimeValue("5")
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+  }
 
-    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
-    
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
-        .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
-        .setStreamConfigs(streamConfig.getStreamConfigsMap());
+  private TableConfigBuilder getOfflineTableBuilder(String tableName) {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(tableName)
+        .setTimeColumnName("timeColumn")
+        .setTimeType("DAYS")
+        .setRetentionTimeUnit("DAYS")
+        .setRetentionTimeValue("5");
   }
 
   @BeforeMethod
@@ -104,39 +124,29 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
 
   private void registerMinionTasks() {
     PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
-    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
-      @Override
-      public String getTaskType() {
-        return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
-      }
-
-      @Override
-      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
-        return List.of(new 
PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new 
HashMap<>()));
-      }
-    });
-    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
-      @Override
-      public String getTaskType() {
-        return MinionConstants.MergeRollupTask.TASK_TYPE;
-      }
+    ClusterInfoAccessor clusterInfoAccessor = 
Mockito.mock(ClusterInfoAccessor.class);
+    Mockito.when(clusterInfoAccessor.getClusterConfig(any())).thenReturn(null);
+    
registerTaskGenerator(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, 
taskManager, clusterInfoAccessor);
+    registerTaskGenerator(MinionConstants.MergeRollupTask.TASK_TYPE, 
taskManager, clusterInfoAccessor);
+    
registerTaskGenerator(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
taskManager, clusterInfoAccessor);
+  }
 
-      @Override
-      public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
-        return List.of(new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>()));
-      }
-    });
-    taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+  private static void registerTaskGenerator(String taskType, PinotTaskManager 
taskManager,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    BaseTaskGenerator taskGenerator = new BaseTaskGenerator() {
       @Override
       public String getTaskType() {
-        return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
+        return taskType;
       }
 
       @Override
       public List<PinotTaskConfig> generateTasks(List<TableConfig> 
tableConfigs) {
-        return List.of(new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new 
HashMap<>()));
+        return List.of(new PinotTaskConfig(taskType,
+            
tableConfigs.get(0).getTaskConfig().getConfigsForTaskType(getTaskType())));
       }
-    });
+    };
+    taskGenerator.init(clusterInfoAccessor);
+    taskManager.registerTaskGenerator(taskGenerator);
   }
 
   @Test
@@ -881,13 +891,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
         getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 2));
 
-    TableConfig realtimeTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        .setServerTenant("DefaultTenant")
-        .setTimeColumnName("timeColumn")
-        .setTimeType("DAYS")
-        .setRetentionTimeUnit("DAYS")
-        .setRetentionTimeValue("5")
-        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+    TableConfig realtimeTableConfig = getRealtimeTableBuilder(tableName)
         .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap)
         .setNumReplicas(10)
         .build();
@@ -903,13 +907,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
         getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 1));
 
-    realtimeTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        .setServerTenant("DefaultTenant")
-        .setTimeColumnName("timeColumn")
-        .setTimeType("DAYS")
-        .setRetentionTimeUnit("DAYS")
-        .setRetentionTimeValue("5")
-        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+    realtimeTableConfig = getRealtimeTableBuilder(tableName)
         .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap)
         .setNumReplicas(10)
         .build();
@@ -992,23 +990,11 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
    */
   private void validateTableUpdateReplicationToInvalidValue(String 
rawTableName, TableType tableType) {
     String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
-    TableConfig tableConfig =
-        tableType == TableType.REALTIME ? new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .setNumReplicas(5)
-            .build() : new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            .setNumReplicas(5)
-            .build();
+    TableConfig tableConfig = (tableType == TableType.REALTIME
+        ? getRealtimeTableBuilder(rawTableName)
+        : getOfflineTableBuilder(rawTableName))
+        .setNumReplicas(5)
+        .build();
 
     try {
       sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1018,23 +1004,11 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
   }
 
   private void createTableWithValidReplication(String rawTableName, TableType 
tableType) {
-    TableConfig tableConfig =
-        tableType == TableType.REALTIME ? new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .setNumReplicas(1)
-            .build() : new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            .setNumReplicas(1)
-            .build();
+    TableConfig tableConfig = (tableType == TableType.REALTIME
+        ? getRealtimeTableBuilder(rawTableName)
+        : getOfflineTableBuilder(rawTableName))
+        .setNumReplicas(1)
+        .build();
 
     try {
       sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1051,23 +1025,11 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
       throws IOException {
     String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
     DEFAULT_INSTANCE.addDummySchema(rawTableName);
-    TableConfig tableConfig =
-        tableType == TableType.REALTIME ? new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-            .setNumReplicas(5)
-            .build() : new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-            .setServerTenant("DefaultTenant")
-            .setTimeColumnName("timeColumn")
-            .setTimeType("DAYS")
-            .setRetentionTimeUnit("DAYS")
-            .setRetentionTimeValue("5")
-            .setNumReplicas(5)
-            .build();
+    TableConfig tableConfig = (tableType == TableType.REALTIME
+        ? getRealtimeTableBuilder(rawTableName)
+        : getOfflineTableBuilder(rawTableName))
+        .setNumReplicas(5)
+        .build();
 
     try {
       sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1090,6 +1052,195 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
         
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(),
 false);
   }
 
+  @Test
+  public void testTableTasksValidationWithNoDanglingTasks()
+      throws Exception {
+    String tableName = "testTableTasksValidation";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, 
ImmutableMap.of())))
+        .build();
+
+    // Should succeed when no dangling tasks exist
+    String creationResponse = sendPostRequest(_createTableUrl, 
offlineTableConfig.toJsonString());
+    assertEquals(creationResponse,
+        "{\"unrecognizedProperties\":{},\"status\":\"Table 
testTableTasksValidation_OFFLINE successfully added\"}");
+
+    // Clean up
+    
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+  }
+
+  @Test
+  public void testTableTasksValidationWithDanglingTasks()
+      throws Exception {
+    String tableName = "testTableTasksValidationWithDangling";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+        .build();
+
+    // First create the table successfully
+    sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+    // Create a task manually to simulate dangling task
+    PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+    TaskSchedulingContext context = new TaskSchedulingContext();
+    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+    Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
+    String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+    waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+    // Now try to create another table with same name (simulating re-creation 
with dangling tasks)
+    sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+        .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+
+    try {
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+      fail("Table creation should fail when dangling tasks exist");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("The table has dangling task data"));
+    }
+
+    // Clean up any remaining tasks
+    try {
+      sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+          .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+    } catch (Exception ignored) {
+      // Ignore if table doesn't exist
+    }
+  }
+
+  @Test
+  public void testTableTasksValidationWithNullTaskConfig()
+      throws Exception {
+    String tableName = "testTableTasksValidationNullConfig";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = 
getOfflineTableBuilder(tableName).build(); // No task config
+
+    // Should succeed when task config is null
+    String creationResponse = sendPostRequest(_createTableUrl, 
offlineTableConfig.toJsonString());
+    assertEquals(creationResponse, "{\"unrecognizedProperties\":{},"
+        + "\"status\":\"Table testTableTasksValidationNullConfig_OFFLINE 
successfully added\"}");
+
+    // Clean up
+    
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+  }
+
+  @Test
+  public void testTableTasksCleanupWithNonActiveTasks()
+      throws Exception {
+    String tableName = "testTableTasksCleanup";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+        .build();
+
+    // Create table
+    sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+    // Create some completed tasks
+    PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+    TaskSchedulingContext context = new TaskSchedulingContext();
+    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+    Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
+    String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+    waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+    // stop the task queue to abort the task
+    sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+        
.forStopMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    waitForTaskState(taskName, TaskState.STOPPED);
+    // resume the task queue again to avoid affecting other tests
+    sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+        
.forResumeMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+
+    // Delete table - should succeed and clean up tasks
+    String deleteResponse = sendDeleteRequest(
+        
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+    assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + 
"_OFFLINE] deleted\"}");
+  }
+
+  private static void waitForTaskState(String taskName, TaskState 
expectedState) {
+    TestUtils.waitForCondition((aVoid) -> {
+      String response;
+      try {
+        response = 
sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forMinionTaskState(taskName));
+      } catch (IOException e) {
+        return false;
+      }
+      return response.replace("\"", "").equals(expectedState.name());
+    }, 5000, "Task not scheduled");
+  }
+
+  @Test
+  public void testTableTasksCleanupWithActiveTasks()
+      throws Exception {
+    String tableName = "testTableTasksCleanupActive";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+        .build();
+
+    // Create table
+    sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+    // Create an active/in-progress task
+    PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+    TaskSchedulingContext context = new TaskSchedulingContext();
+    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+    Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
+    String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+    waitForTaskState(taskName, TaskState.IN_PROGRESS);
+    try {
+      // Try to delete table without ignoring active tasks - should fail
+      
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+      fail("Table deletion should fail when active tasks exist");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("The table has") && 
e.getMessage().contains("active running tasks"));
+    }
+
+    // Delete table with ignoreActiveTasks flag - should succeed
+    String deleteResponse = sendDeleteRequest(
+        
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName + 
"?ignoreActiveTasks=true"));
+    assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + 
"_OFFLINE] deleted\"}");
+
+    // delete task
+    
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forDeleteMinionTask(taskName)
+        + "?forceDelete=true");
+  }
+
+  @Test
+  public void testTableTasksCleanupWithNullTaskConfig()
+      throws Exception {
+    String tableName = "testTableTasksCleanupNullConfig";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+
+    TableConfig offlineTableConfig = 
getOfflineTableBuilder(tableName).build(); // No task config
+
+    // Create table
+    sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+    // Delete table - should succeed even with null task config
+    String deleteResponse = sendDeleteRequest(
+        
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+    assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + 
"_OFFLINE] deleted\"}");
+  }
+
   @AfterMethod
   public void cleanUp()
       throws IOException {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 0fad49e6407..cb5de2956ac 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -118,6 +118,22 @@ public class ControllerRequestURLBuilder {
         + "&type=" + tableType);
   }
 
+  public String forMinionTaskState(String taskName) {
+    return StringUtil.join("/", _baseUrl, "tasks", "task", taskName, "state");
+  }
+
+  public String forDeleteMinionTask(String taskName) {
+    return StringUtil.join("/", _baseUrl, "tasks", "task", taskName);
+  }
+
+  public String forStopMinionTaskQueue(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
+  }
+
+  public String forResumeMinionTaskQueue(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume");
+  }
+
   public String forUpdateUserConfig(String username, String componentTypeStr, 
boolean passwordChanged) {
     StringBuilder params = new StringBuilder();
     if (StringUtils.isNotBlank(username)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to