This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f616be74201 Perform task data cleanup as part of table deletion (#16307) f616be74201 is described below commit f616be74201ea0e68bf01443e2db4aab19a79027 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Wed Jul 16 17:53:17 2025 +0530 Perform task data cleanup as part of table deletion (#16307) --- .../api/resources/PinotTableRestletResource.java | 83 ++++- .../api/resources/TableConfigsRestletResource.java | 26 +- .../core/minion/PinotHelixTaskResourceManager.java | 7 +- .../api/PinotTableRestletResourceTest.java | 349 +++++++++++++++------ .../utils/builder/ControllerRequestURLBuilder.java | 16 + 5 files changed, 373 insertions(+), 108 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index bd97c37d1e5..7f1dae48634 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -72,6 +72,7 @@ import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.task.TaskState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.exception.RebalanceInProgressException; @@ -207,8 +208,10 @@ public class PinotTableRestletResource { @ManualAuthorization // performed after parsing table configs public ConfigSuccessResponse addTable(String tableConfigStr, @ApiParam(value = "comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)") - @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, @Context HttpHeaders httpHeaders, - @Context Request request) { + @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, + @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") boolean ignoreActiveTasks, + @Context HttpHeaders httpHeaders, @Context Request request) + throws IOException { // TODO introduce a table config ctor with json string. Pair<TableConfig, Map<String, Object>> tableConfigAndUnrecognizedProperties; TableConfig tableConfig; @@ -246,6 +249,9 @@ public class PinotTableRestletResource { } catch (Exception e) { throw new InvalidTableConfigException(e); } + if (!ignoreActiveTasks) { + tableTasksValidation(tableConfig, _pinotHelixTaskResourceManager); + } _pinotHelixResourceManager.addTable(tableConfig); // TODO: validate that table was created successfully // (in realtime case, metadata might not have been created but would be created successfully in the next run of @@ -260,6 +266,8 @@ public class PinotTableRestletResource { throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST, e); } else if (e instanceof TableAlreadyExistsException) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else if (e instanceof ControllerApplicationException) { + throw e; } else { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } @@ -426,6 +434,7 @@ public class PinotTableRestletResource { @ApiParam(value = "Retention period for the table segments (e.g. 12h, 3d); If not set, the retention period " + "will default to the first config that's not null: the cluster setting, then '7d'. Using 0d or -1d will " + "instantly delete segments without retention") @QueryParam("retention") String retentionPeriod, + @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") boolean ignoreActiveTasks, @Context HttpHeaders headers) { TableType tableType = Constants.validateTableType(tableTypeStr); @@ -435,21 +444,25 @@ public class PinotTableRestletResource { validateLogicalTableReference(tableName, tableType); boolean tableExist = false; if (verifyTableType(tableName, tableType, TableType.OFFLINE)) { + String tableWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + tableTasksCleanup(tableWithType, ignoreActiveTasks, _pinotHelixResourceManager, _pinotHelixTaskResourceManager); tableExist = _pinotHelixResourceManager.hasOfflineTable(tableName); // Even the table name does not exist, still go on to delete remaining table metadata in case a previous delete // did not complete. _pinotHelixResourceManager.deleteOfflineTable(tableName, retentionPeriod); if (tableExist) { - tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName)); + tablesDeleted.add(tableWithType); } } if (verifyTableType(tableName, tableType, TableType.REALTIME)) { + String tableWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + tableTasksCleanup(tableWithType, ignoreActiveTasks, _pinotHelixResourceManager, _pinotHelixTaskResourceManager); tableExist = _pinotHelixResourceManager.hasRealtimeTable(tableName); // Even the table name does not exist, still go on to delete remaining table metadata in case a previous delete // did not complete. _pinotHelixResourceManager.deleteRealtimeTable(tableName, retentionPeriod); if (tableExist) { - tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName)); + tablesDeleted.add(tableWithType); } } if (!tablesDeleted.isEmpty()) { @@ -463,6 +476,68 @@ public class PinotTableRestletResource { "Table '" + tableName + "' with type " + tableType + " does not exist", Response.Status.NOT_FOUND); } + public static void tableTasksValidation(TableConfig tableConfig, + PinotHelixTaskResourceManager pinotHelixTaskResourceManager) { + if (tableConfig.getTaskConfig() == null) { + return; + } + String tableWithType = tableConfig.getTableName(); + Map<String, Map<String, String>> taskTypeConfigsMap = tableConfig.getTaskConfig().getTaskTypeConfigsMap(); + for (String taskType : taskTypeConfigsMap.keySet()) { + Map<String, TaskState> taskStates; + try { + taskStates = pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType); + } catch (IllegalArgumentException e) { + LOGGER.info(e.getMessage()); + return; + } + if (!taskStates.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "The table has dangling task data, try performing table " + + "delete operation in case the delete operation was not completed successfully, else delete the tasks " + + "manually through DELETE /tasks/task/{taskName} endpoint. Please try again once the dangling tasks are " + + "cleaned up", Response.Status.BAD_REQUEST); + } + } + } + + public static void tableTasksCleanup(String tableWithType, boolean ignoreActiveTasks, + PinotHelixResourceManager pinotHelixResourceManager, PinotHelixTaskResourceManager pinotHelixTaskResourceManager) + throws IOException { + TableConfig tableConfig = pinotHelixResourceManager.getTableConfig(tableWithType); + if (tableConfig == null || tableConfig.getTaskConfig() == null) { + return; + } + Map<String, Map<String, String>> taskTypeConfigsMap = tableConfig.getTaskConfig().getTaskTypeConfigsMap(); + Set<String> taskTypes = taskTypeConfigsMap.keySet(); + for (String taskType : taskTypes) { + // remove the task schedules to avoid task being scheduled during table deletion + taskTypeConfigsMap.get(taskType).remove(PinotTaskManager.SCHEDULE_KEY); + } + pinotHelixResourceManager.updateTableConfig(tableConfig); + List<String> pendingTasks = new ArrayList<>(); + for (String taskType : taskTypes) { + Map<String, TaskState> taskStates; + try { + taskStates = pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType); + } catch (IllegalArgumentException e) { + LOGGER.info(e.getMessage()); + continue; + } + for (String taskName : taskStates.keySet()) { + if (TaskState.IN_PROGRESS.equals(taskStates.get(taskName))) { + pendingTasks.add(taskName); + } else { + pinotHelixTaskResourceManager.deleteTask(taskName, true); + } + } + } + if (!ignoreActiveTasks && !pendingTasks.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "The table has " + pendingTasks.size() + " active running tasks " + + ": " + pendingTasks + ". The task schedules have been cleared, so new tasks should not be generated. " + + "Please try again once there are no more active tasks", Response.Status.BAD_REQUEST); + } + } + // Return true iff the table is of the expectedType based on the given tableName and tableType. The truth table: // tableType TableNameBuilder.getTableTypeFromTableName(tableName) Return value // 1. null null (i.e., table has no type suffix) true diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java index 3a15ecb30e8..1a26ddc8265 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java @@ -64,6 +64,7 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; import org.apache.pinot.controller.util.TaskConfigUtils; @@ -108,6 +109,9 @@ public class TableConfigsRestletResource { @Inject PinotHelixResourceManager _pinotHelixResourceManager; + @Inject + PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; + @Inject PinotTaskManager _pinotTaskManager; @@ -192,8 +196,10 @@ public class TableConfigsRestletResource { public ConfigSuccessResponse addConfig( String tableConfigsStr, @ApiParam(value = "comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)") - @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, @Context HttpHeaders httpHeaders, - @Context Request request) { + @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, + @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") boolean ignoreActiveTasks, + @Context HttpHeaders httpHeaders, @Context Request request) + throws Exception { Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps; try { tableConfigsAndUnrecognizedProps = @@ -231,9 +237,15 @@ public class TableConfigsRestletResource { if (offlineTableConfig != null) { tuneConfig(offlineTableConfig, schema); + if (!ignoreActiveTasks) { + PinotTableRestletResource.tableTasksValidation(offlineTableConfig, _pinotHelixTaskResourceManager); + } } if (realtimeTableConfig != null) { tuneConfig(realtimeTableConfig, schema); + if (!ignoreActiveTasks) { + PinotTableRestletResource.tableTasksValidation(realtimeTableConfig, _pinotHelixTaskResourceManager); + } } try { _pinotHelixResourceManager.addSchema(schema, false, false); @@ -264,6 +276,8 @@ public class TableConfigsRestletResource { rawTableName, e.getMessage()), Response.Status.BAD_REQUEST, e); } else if (e instanceof TableAlreadyExistsException) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); + } else if (e instanceof ControllerApplicationException) { + throw e; } else { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } @@ -283,7 +297,9 @@ public class TableConfigsRestletResource { @ApiOperation(value = "Delete the TableConfigs", notes = "Delete the TableConfigs") public SuccessResponse deleteConfig( @ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName") - String tableName, @Context HttpHeaders headers) { + String tableName, + @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks") boolean ignoreActiveTasks, + @Context HttpHeaders headers) { try { if (TableNameBuilder.isOfflineTableResource(tableName) || TableNameBuilder.isRealtimeTableResource(tableName)) { throw new ControllerApplicationException(LOGGER, "Invalid table name: " + tableName + ". Use raw table name.", @@ -306,9 +322,13 @@ public class TableConfigsRestletResource { boolean tableExists = _pinotHelixResourceManager.hasRealtimeTable(tableName) || _pinotHelixResourceManager.hasOfflineTable( tableName); + PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.REALTIME.tableNameWithType(tableName), + ignoreActiveTasks, _pinotHelixResourceManager, _pinotHelixTaskResourceManager); // Delete whether tables exist or not _pinotHelixResourceManager.deleteRealtimeTable(tableName); LOGGER.info("Deleted realtime table: {}", tableName); + PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.OFFLINE.tableNameWithType(tableName), + ignoreActiveTasks, _pinotHelixResourceManager, _pinotHelixTaskResourceManager); _pinotHelixResourceManager.deleteOfflineTable(tableName); LOGGER.info("Deleted offline table: {}", tableName); boolean schemaExists = _pinotHelixResourceManager.deleteSchema(tableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index e1620f7c935..6f1fb4c0a22 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -524,8 +524,11 @@ public class PinotHelixTaskResourceManager { * @return List of child task configs */ public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) { - Collection<TaskConfig> helixTaskConfigs = - _taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values(); + JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName)); + if (jobConfig == null) { + return List.of(); + } + Collection<TaskConfig> helixTaskConfigs = jobConfig.getTaskConfigMap().values(); List<PinotTaskConfig> taskConfigs = new ArrayList<>(helixTaskConfigs.size()); for (TaskConfig helixTaskConfig : helixTaskConfigs) { taskConfigs.add(PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java index e464032ee8f..7af7ad37345 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java @@ -29,10 +29,15 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.core.Response; import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.task.TaskState; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; +import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo; import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.core.common.MinionConstants; @@ -50,18 +55,21 @@ import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -76,8 +84,8 @@ import static org.testng.Assert.fail; public class PinotTableRestletResourceTest extends ControllerTest { private static final String OFFLINE_TABLE_NAME = "testOfflineTable"; private static final String REALTIME_TABLE_NAME = "testRealtimeTable"; - private final TableConfigBuilder _offlineBuilder = new TableConfigBuilder(TableType.OFFLINE); - private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME); + private final TableConfigBuilder _offlineBuilder = getOfflineTableBuilder(OFFLINE_TABLE_NAME); + private final TableConfigBuilder _realtimeBuilder = getRealtimeTableBuilder(REALTIME_TABLE_NAME); private String _createTableUrl; @BeforeClass @@ -86,13 +94,25 @@ public class PinotTableRestletResourceTest extends ControllerTest { DEFAULT_INSTANCE.setupSharedStateAndValidate(); registerMinionTasks(); _createTableUrl = DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate(); - _offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS") - .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5"); + } + + private TableConfigBuilder getRealtimeTableBuilder(String tableName) { + return new TableConfigBuilder(TableType.REALTIME) + .setTableName(tableName) + .setTimeColumnName("timeColumn") + .setTimeType("DAYS") + .setRetentionTimeUnit("DAYS") + .setRetentionTimeValue("5") + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()); + } - StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(); - _realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS") - .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5") - .setStreamConfigs(streamConfig.getStreamConfigsMap()); + private TableConfigBuilder getOfflineTableBuilder(String tableName) { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(tableName) + .setTimeColumnName("timeColumn") + .setTimeType("DAYS") + .setRetentionTimeUnit("DAYS") + .setRetentionTimeValue("5"); } @BeforeMethod @@ -104,39 +124,29 @@ public class PinotTableRestletResourceTest extends ControllerTest { private void registerMinionTasks() { PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager(); - taskManager.registerTaskGenerator(new BaseTaskGenerator() { - @Override - public String getTaskType() { - return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE; - } - - @Override - public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - return List.of(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new HashMap<>())); - } - }); - taskManager.registerTaskGenerator(new BaseTaskGenerator() { - @Override - public String getTaskType() { - return MinionConstants.MergeRollupTask.TASK_TYPE; - } + ClusterInfoAccessor clusterInfoAccessor = Mockito.mock(ClusterInfoAccessor.class); + Mockito.when(clusterInfoAccessor.getClusterConfig(any())).thenReturn(null); + registerTaskGenerator(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, taskManager, clusterInfoAccessor); + registerTaskGenerator(MinionConstants.MergeRollupTask.TASK_TYPE, taskManager, clusterInfoAccessor); + registerTaskGenerator(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskManager, clusterInfoAccessor); + } - @Override - public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - return List.of(new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>())); - } - }); - taskManager.registerTaskGenerator(new BaseTaskGenerator() { + private static void registerTaskGenerator(String taskType, PinotTaskManager taskManager, + ClusterInfoAccessor clusterInfoAccessor) { + BaseTaskGenerator taskGenerator = new BaseTaskGenerator() { @Override public String getTaskType() { - return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE; + return taskType; } @Override public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - return List.of(new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>())); + return List.of(new PinotTaskConfig(taskType, + tableConfigs.get(0).getTaskConfig().getConfigsForTaskType(getTaskType()))); } - }); + }; + taskGenerator.init(clusterInfoAccessor); + taskManager.registerTaskGenerator(taskGenerator); } @Test @@ -881,13 +891,7 @@ public class PinotTableRestletResourceTest extends ControllerTest { instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(), getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 2)); - TableConfig realtimeTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + TableConfig realtimeTableConfig = getRealtimeTableBuilder(tableName) .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap) .setNumReplicas(10) .build(); @@ -903,13 +907,7 @@ public class PinotTableRestletResourceTest extends ControllerTest { instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(), getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 1)); - realtimeTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + realtimeTableConfig = getRealtimeTableBuilder(tableName) .setInstanceAssignmentConfigMap(instanceAssignmentConfigMap) .setNumReplicas(10) .build(); @@ -992,23 +990,11 @@ public class PinotTableRestletResourceTest extends ControllerTest { */ private void validateTableUpdateReplicationToInvalidValue(String rawTableName, TableType tableType) { String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(rawTableName); - TableConfig tableConfig = - tableType == TableType.REALTIME ? new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .setNumReplicas(5) - .build() : new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setNumReplicas(5) - .build(); + TableConfig tableConfig = (tableType == TableType.REALTIME + ? getRealtimeTableBuilder(rawTableName) + : getOfflineTableBuilder(rawTableName)) + .setNumReplicas(5) + .build(); try { sendPostRequest(_createTableUrl, tableConfig.toJsonString()); @@ -1018,23 +1004,11 @@ public class PinotTableRestletResourceTest extends ControllerTest { } private void createTableWithValidReplication(String rawTableName, TableType tableType) { - TableConfig tableConfig = - tableType == TableType.REALTIME ? new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .setNumReplicas(1) - .build() : new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setNumReplicas(1) - .build(); + TableConfig tableConfig = (tableType == TableType.REALTIME + ? getRealtimeTableBuilder(rawTableName) + : getOfflineTableBuilder(rawTableName)) + .setNumReplicas(1) + .build(); try { sendPostRequest(_createTableUrl, tableConfig.toJsonString()); @@ -1051,23 +1025,11 @@ public class PinotTableRestletResourceTest extends ControllerTest { throws IOException { String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(rawTableName); DEFAULT_INSTANCE.addDummySchema(rawTableName); - TableConfig tableConfig = - tableType == TableType.REALTIME ? new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .setNumReplicas(5) - .build() : new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) - .setServerTenant("DefaultTenant") - .setTimeColumnName("timeColumn") - .setTimeType("DAYS") - .setRetentionTimeUnit("DAYS") - .setRetentionTimeValue("5") - .setNumReplicas(5) - .build(); + TableConfig tableConfig = (tableType == TableType.REALTIME + ? getRealtimeTableBuilder(rawTableName) + : getOfflineTableBuilder(rawTableName)) + .setNumReplicas(5) + .build(); try { sendPostRequest(_createTableUrl, tableConfig.toJsonString()); @@ -1090,6 +1052,195 @@ public class PinotTableRestletResourceTest extends ControllerTest { InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(), false); } + @Test + public void testTableTasksValidationWithNoDanglingTasks() + throws Exception { + String tableName = "testTableTasksValidation"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of( + MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, ImmutableMap.of()))) + .build(); + + // Should succeed when no dangling tasks exist + String creationResponse = sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + assertEquals(creationResponse, + "{\"unrecognizedProperties\":{},\"status\":\"Table testTableTasksValidation_OFFLINE successfully added\"}"); + + // Clean up + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); + } + + @Test + public void testTableTasksValidationWithDanglingTasks() + throws Exception { + String tableName = "testTableTasksValidationWithDangling"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of( + MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, + ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *", + CommonConstants.TABLE_NAME, tableName + "_OFFLINE")))) + .build(); + + // First create the table successfully + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + + // Create a task manually to simulate dangling task + PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager(); + TaskSchedulingContext context = new TaskSchedulingContext(); + context.setTablesToSchedule(Set.of(tableName + "_OFFLINE")); + Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context); + String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0); + waitForTaskState(taskName, TaskState.IN_PROGRESS); + + // Now try to create another table with same name (simulating re-creation with dangling tasks) + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder() + .forTableDelete(tableName + "?ignoreActiveTasks=true")); + + try { + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + fail("Table creation should fail when dangling tasks exist"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("The table has dangling task data")); + } + + // Clean up any remaining tasks + try { + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder() + .forTableDelete(tableName + "?ignoreActiveTasks=true")); + } catch (Exception ignored) { + // Ignore if table doesn't exist + } + } + + @Test + public void testTableTasksValidationWithNullTaskConfig() + throws Exception { + String tableName = "testTableTasksValidationNullConfig"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName).build(); // No task config + + // Should succeed when task config is null + String creationResponse = sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + assertEquals(creationResponse, "{\"unrecognizedProperties\":{}," + + "\"status\":\"Table testTableTasksValidationNullConfig_OFFLINE successfully added\"}"); + + // Clean up + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); + } + + @Test + public void testTableTasksCleanupWithNonActiveTasks() + throws Exception { + String tableName = "testTableTasksCleanup"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of( + MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, + ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *", + CommonConstants.TABLE_NAME, tableName + "_OFFLINE")))) + .build(); + + // Create table + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + + // Create some completed tasks + PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager(); + TaskSchedulingContext context = new TaskSchedulingContext(); + context.setTablesToSchedule(Set.of(tableName + "_OFFLINE")); + Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context); + String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0); + waitForTaskState(taskName, TaskState.IN_PROGRESS); + + // stop the task queue to abort the task + sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder() + .forStopMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE)); + waitForTaskState(taskName, TaskState.STOPPED); + // resume the task queue again to avoid affecting other tests + sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder() + .forResumeMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE)); + + // Delete table - should succeed and clean up tasks + String deleteResponse = sendDeleteRequest( + DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); + assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + "_OFFLINE] deleted\"}"); + } + + private static void waitForTaskState(String taskName, TaskState expectedState) { + TestUtils.waitForCondition((aVoid) -> { + String response; + try { + response = sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forMinionTaskState(taskName)); + } catch (IOException e) { + return false; + } + return response.replace("\"", "").equals(expectedState.name()); + }, 5000, "Task not scheduled"); + } + + @Test + public void testTableTasksCleanupWithActiveTasks() + throws Exception { + String tableName = "testTableTasksCleanupActive"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of( + MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, + ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *", + CommonConstants.TABLE_NAME, tableName + "_OFFLINE")))) + .build(); + + // Create table + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + + // Create an active/in-progress task + PinotTaskManager taskManager = DEFAULT_INSTANCE.getControllerStarter().getTaskManager(); + TaskSchedulingContext context = new TaskSchedulingContext(); + context.setTablesToSchedule(Set.of(tableName + "_OFFLINE")); + Map<String, TaskSchedulingInfo> taskInfo = taskManager.scheduleTasks(context); + String taskName = taskInfo.values().iterator().next().getScheduledTaskNames().get(0); + waitForTaskState(taskName, TaskState.IN_PROGRESS); + try { + // Try to delete table without ignoring active tasks - should fail + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); + fail("Table deletion should fail when active tasks exist"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("The table has") && e.getMessage().contains("active running tasks")); + } + + // Delete table with ignoreActiveTasks flag - should succeed + String deleteResponse = sendDeleteRequest( + DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName + "?ignoreActiveTasks=true")); + assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + "_OFFLINE] deleted\"}"); + + // delete task + sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forDeleteMinionTask(taskName) + + "?forceDelete=true"); + } + + @Test + public void testTableTasksCleanupWithNullTaskConfig() + throws Exception { + String tableName = "testTableTasksCleanupNullConfig"; + DEFAULT_INSTANCE.addDummySchema(tableName); + + TableConfig offlineTableConfig = getOfflineTableBuilder(tableName).build(); // No task config + + // Create table + sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + + // Delete table - should succeed even with null task config + String deleteResponse = sendDeleteRequest( + DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName)); + assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + "_OFFLINE] deleted\"}"); + } + @AfterMethod public void cleanUp() throws IOException { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 0fad49e6407..cb5de2956ac 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -118,6 +118,22 @@ public class ControllerRequestURLBuilder { + "&type=" + tableType); } + public String forMinionTaskState(String taskName) { + return StringUtil.join("/", _baseUrl, "tasks", "task", taskName, "state"); + } + + public String forDeleteMinionTask(String taskName) { + return StringUtil.join("/", _baseUrl, "tasks", "task", taskName); + } + + public String forStopMinionTaskQueue(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop"); + } + + public String forResumeMinionTaskQueue(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume"); + } + public String forUpdateUserConfig(String username, String componentTypeStr, boolean passwordChanged) { StringBuilder params = new StringBuilder(); if (StringUtils.isNotBlank(username)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org