This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e1b0e5357e Refactor PinotTaskManager class (#12964) e1b0e5357e is described below commit e1b0e5357ebfcecffcc6cce3997a3edcdac1aa2c Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Sat Apr 20 03:35:01 2024 +0530 Refactor PinotTaskManager class (#12964) --- .../api/resources/PinotTaskRestletResource.java | 14 +- .../helix/core/minion/CronJobScheduleJob.java | 2 +- .../helix/core/minion/PinotTaskManager.java | 162 +++++++++------------ .../MergeRollupMinionClusterIntegrationTest.java | 140 +++++++++--------- .../tests/PurgeMinionClusterIntegrationTest.java | 40 +++-- ...fflineSegmentsMinionClusterIntegrationTest.java | 55 ++++--- .../tests/SimpleMinionClusterIntegrationTest.java | 45 +++--- .../integration/tests/TlsIntegrationTest.java | 2 +- .../tests/UpsertTableIntegrationTest.java | 14 +- .../tests/UrlAuthRealtimeIntegrationTest.java | 2 +- 10 files changed, 217 insertions(+), 259 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index e09bde8466..0d9d3a05c1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -618,22 +618,20 @@ public class PinotTaskRestletResource { @ApiOperation("Schedule tasks and return a map from task type to task name scheduled") public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName, - @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") - @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag, - @Context HttpHeaders headers) { + @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag") + @Nullable String minionInstanceTag, @Context HttpHeaders headers) { String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE; if (taskType != null) { // Schedule task for the given task type - List<String> taskNames = tableName != null - ? _pinotTaskManager.scheduleTask(taskType, + List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag); return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ',')); } else { // Schedule tasks for all task types - Map<String, List<String>> allTaskNames = tableName != null - ? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) - : _pinotTaskManager.scheduleTasksForDatabase(database, minionInstanceTag); + Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable( + DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag) + : _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag); return allTaskNames.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue()))); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java index 8c0433854f..f9b250b2bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java @@ -65,7 +65,7 @@ public class CronJobScheduleJob implements Job { return; } long jobStartTime = System.currentTimeMillis(); - pinotTaskManager.scheduleTask(taskType, table); + pinotTaskManager.scheduleTaskForTable(taskType, table, null); LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType, jobExecutionContext.getNextFireTime()); pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 4029944139..97417d6bea 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -480,30 +479,72 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } /** - * Public API to schedule tasks (all task types) for all tables in all databases. + * Schedules tasks (all task types) for all tables. * It might be called from the non-leader controller. * Returns a map from the task type to the list of tasks scheduled. */ - public synchronized Map<String, List<String>> scheduleTasks() { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, null); + public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) { + return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag); } /** - * Public API to schedule tasks (all task types) for all tables in given database. + * Schedules tasks (all task types) for all tables in the given database. * It might be called from the non-leader controller. * Returns a map from the task type to the list of tasks scheduled. */ - public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database, + public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database, @Nullable String minionInstanceTag) { return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag); } + /** + * Schedules tasks (all task types) for the given table. + * It might be called from the non-leader controller. + * Returns a map from the task type to the list of tasks scheduled. + */ + public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType, + @Nullable String minionInstanceTag) { + return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag); + } + + /** + * Schedules task for the given task type for all tables. + * It might be called from the non-leader controller. + * Returns a list of tasks scheduled, or {@code null} if no task is scheduled. + */ + @Nullable + public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) { + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag); + } + + /** + * Schedules task for the given task type for all tables in the given database. + * It might be called from the non-leader controller. + * Returns a list of tasks scheduled, or {@code null} if no task is scheduled. + */ + @Nullable + public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database, + @Nullable String minionInstanceTag) { + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag); + } + + /** + * Schedules task for the given task type for the give table. + * It might be called from the non-leader controller. + * Returns a list of tasks scheduled, or {@code null} if no task is scheduled. + */ + @Nullable + public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType, + @Nullable String minionInstanceTag) { + return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag); + } + /** * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map * from the task type to the list of the tasks scheduled. */ - private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, - boolean isLeader, @Nullable String minionInstanceTag) { + private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader, + @Nullable String minionInstanceTag) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); // Scan all table configs to get the tables with tasks enabled @@ -541,6 +582,27 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { return tasksScheduled; } + @Nullable + private synchronized List<String> scheduleTask(String taskType, List<String> tables, + @Nullable String minionInstanceTag) { + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); + + // Scan all table configs to get the tables with task enabled + List<TableConfig> enabledTableConfigs = new ArrayList<>(); + for (String tableNameWithType : tables) { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig() + .isTaskTypeEnabled(taskType)) { + enabledTableConfigs.add(tableConfig); + } + } + + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); + return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag); + } + /** * Helper method to schedule task with the given task generator for the given tables that have the task enabled. * Returns the list of task names, or {@code null} if no task is scheduled. @@ -554,8 +616,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { for (TableConfig tableConfig : enabledTableConfigs) { String tableName = tableConfig.getTableName(); try { - String minionInstanceTag = minionInstanceTagForTask != null - ? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig); + String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask + : taskGenerator.getMinionInstanceTag(tableConfig); List<PinotTaskConfig> presentTaskConfig = minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>()); taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig); @@ -622,86 +684,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { return submittedTaskNames; } - /** - * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader - * controller. Returns a map from the task type to the list of tasks scheduled. - */ - public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) { - return scheduleTasks(Collections.singletonList(tableNameWithType), false, null); - } - - /** - * Public API to schedule tasks (all task types) for the given table on a specific instance tag. - * It might be called from the non-leader controller. Returns a map from the task type to the list of tasks scheduled. - */ - public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType, - @Nullable String minionInstanceTag) { - return scheduleTasks(Collections.singletonList(tableNameWithType), false, minionInstanceTag); - } - - /** - * Public API to schedule task for the given task type in all databases. - * It might be called from the non-leader controller. - * Returns the list of task names, or {@code null} if no task is scheduled. - */ - @Nullable - public synchronized List<String> scheduleTask(String taskType, @Nullable String minionInstanceTag) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag); - } - - /** - * Public API to schedule task for the given task type in given database. - * It might be called from the non-leader controller. - * Returns the list of task name, or {@code null} if no task is scheduled. - */ - @Nullable - public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database, - @Nullable String minionInstanceTag) { - return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag); - } - - @Nullable - private List<String> scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) { - PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); - Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); - - // Scan all table configs to get the tables with task enabled - List<TableConfig> enabledTableConfigs = new ArrayList<>(); - for (String tableNameWithType : tables) { - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig() - .isTaskTypeEnabled(taskType)) { - enabledTableConfigs.add(tableConfig); - } - } - - _helixTaskResourceManager.ensureTaskQueueExists(taskType); - addTaskTypeMetricsUpdaterIfNeeded(taskType); - return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag); - } - - /** - * Public API to schedule task for the given task type on the given table. It might be called from the non-leader - * controller. Returns the list of task names, or {@code null} if no task is scheduled. - */ - @Nullable - public synchronized List<String> scheduleTask(String taskType, String tableNameWithType, - @Nullable String minionInstanceTag) { - PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); - Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); - - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); - - Preconditions.checkState( - tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType), - "Table: %s does not have task type: %s enabled", tableNameWithType, taskType); - - _helixTaskResourceManager.ensureTaskQueueExists(taskType); - addTaskTypeMetricsUpdaterIfNeeded(taskType); - return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false, minionInstanceTag); - } - @Override protected void processTables(List<String> tableNamesWithType, Properties taskProperties) { scheduleTasks(tableNamesWithType, true, null); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index b655416c87..c5be600661 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -139,14 +139,14 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat List<File> avroFiles = unpackAvroData(_tempDir); // Create and upload segments - ClusterIntegrationTestUtils - .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, _tarDir1); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, + _tarDir1); buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1"); buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2"); - ClusterIntegrationTestUtils - .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3); - ClusterIntegrationTestUtils - .buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, + _tarDir3); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, + _segmentDir4, _tarDir4); uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1); uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2); uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3); @@ -160,8 +160,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE); addSchema(schema); TableConfig singleLevelConcatProcessAllRealtimeTableConfig = - createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), - MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, PROCESS_ALL_MODE_KAFKA_TOPIC); + createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, + PROCESS_ALL_MODE_KAFKA_TOPIC); addTableConfig(singleLevelConcatProcessAllRealtimeTableConfig); // Push data into Kafka @@ -172,9 +172,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles.subList(0, 3), "localhost:" + getKafkaPort(), PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones()); - ClusterIntegrationTestUtils - .buildSegmentsFromAvro(avroFiles.subList(3, 9), singleLevelConcatProcessAllRealtimeTableConfig, schema, 0, - _segmentDir5, _tarDir5); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(3, 9), + singleLevelConcatProcessAllRealtimeTableConfig, schema, 0, _segmentDir5, _tarDir5); // Wait for all documents loaded waitForAllDocsLoaded(600_000L); @@ -216,14 +215,14 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig, @Nullable SegmentPartitionConfig partitionConfig) { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) - .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) - .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) - .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) - .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) - .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()) - .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) - .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build(); + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled()) + .setSegmentPartitionConfig(partitionConfig).build(); } protected TableConfig createRealtimeTableConfigWithProcessAllMode(File sampleAvroFile, String tableName, @@ -246,12 +245,12 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min"); tableTaskConfigs.put("WeatherDelay.aggregationType", "sum"); tableTaskConfigs.put("mode", "processAll"); - return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) - .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) - .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) - .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) - .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) - .setLoadMode(getLoadMode()).setTaskConfig( + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()) + .setTaskConfig( new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs))) .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) .setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigs) @@ -411,17 +410,16 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; - taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check watermark @@ -527,17 +525,16 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; - taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check watermark @@ -636,17 +633,16 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; - taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check watermark @@ -788,17 +784,16 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; - taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check watermark @@ -859,8 +854,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat return false; } // Check if the task metadata is cleaned up - if (MinionTaskMetadataUtils - .fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) { + if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE, + tableNameWithType) != null) { return false; } return true; @@ -921,18 +916,17 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; - taskList = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { // assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check watermark @@ -1027,17 +1021,16 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat int numTasks = 0; List<String> taskList; for (String tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; taskList = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { assertTrue(helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull( - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null) + .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); waitForTaskToComplete(); // Check not using watermarks @@ -1069,11 +1062,10 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat waitForAllDocsLoaded(600_000L); for (String tasks = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0); - tasks != null; taskList = - taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), - tasks = taskList != null ? taskList.get(0) : null, - numTasks++) { + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE) + .get(0); tasks != null; taskList = + taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE), + tasks = taskList != null ? taskList.get(0) : null, numTasks++) { waitForTaskToComplete(); // Check metrics long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(), diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java index c4ba131f6d..da4e85696c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.integration.tests; -import com.google.common.collect.ImmutableList; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -63,7 +62,6 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3"; private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4"; - protected PinotHelixTaskResourceManager _helixTaskResourceManager; protected PinotTaskManager _taskManager; protected PinotHelixResourceManager _pinotHelixResourceManager; @@ -83,12 +81,8 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes startBrokers(1); startServers(1); - List<String> allTables = ImmutableList.of( - PURGE_FIRST_RUN_TABLE, - PURGE_DELTA_PASSED_TABLE, - PURGE_DELTA_NOT_PASSED_TABLE, - PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE - ); + List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE, + PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); Schema schema = null; TableConfig tableConfig = null; for (String tableName : allTables) { @@ -152,12 +146,9 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes private void setRecordPurger() { MinionContext minionContext = MinionContext.getInstance(); minionContext.setRecordPurgerFactory(rawTableName -> { - List<String> tableNames = Arrays.asList( - PURGE_FIRST_RUN_TABLE, - PURGE_DELTA_PASSED_TABLE, - PURGE_DELTA_NOT_PASSED_TABLE, - PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE - ); + List<String> tableNames = + Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE, + PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); if (tableNames.contains(rawTableName)) { return row -> row.getValue("ArrTime").equals(1); } else { @@ -195,11 +186,12 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // 5. Check the purge process itself by setting an expecting number of rows String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE); - assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNotNull( + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); waitForTaskToComplete(); // Check that metadata contains expected values @@ -209,7 +201,7 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); } // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); // 52 rows with ArrTime = 1 // 115545 totals rows @@ -239,11 +231,12 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // 5. Check the purge process itself by setting an expecting number of rows String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE); - assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNotNull( + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); waitForTaskToComplete(); // Check that metadata contains expected values @@ -255,7 +248,7 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 86400000); } // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); // 52 rows with ArrTime = 1 // 115545 totals rows @@ -287,7 +280,7 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE); // No task should be schedule as the delay is not passed - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Check purge time String purgeTime = @@ -338,10 +331,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // schedule purge tasks String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); - assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNotNull( + _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); - assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE)); waitForTaskToComplete(); // Check that metadata contains expected values diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index 043c654ef7..e6c8ce2700 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -134,14 +134,14 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC Map<String, String> taskConfigsWithMetadata = new HashMap<>(); taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true"); - taskConfigsWithMetadata.put( - BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString()); + taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.METADATA.toString()); String tableWithMetadataPush = "myTable2"; schema.setSchemaName(tableWithMetadataPush); addSchema(schema); TableConfig realtimeMetadataTableConfig = createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush, - new TableTaskConfig(Collections.singletonMap( - MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigsWithMetadata))); + new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + taskConfigsWithMetadata))); realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig); realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig)); addTableConfig(realtimeMetadataTableConfig); @@ -151,7 +151,6 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig)); addTableConfig(offlineMetadataTableConfig); - // Push data into Kafka pushAvroIntoKafka(avroFiles); @@ -163,7 +162,6 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC waitForDocsLoaded(600_000L, true, tableWithMetadataPush); - _taskResourceManager = _controllerStarter.getHelixTaskResourceManager(); _taskManager = _controllerStarter.getTaskManager(); _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); @@ -181,8 +179,8 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC } _dataSmallestTimeMs = minSegmentTimeMs; - segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName); - minSegmentTimeMs = Long.MAX_VALUE; + segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName); + minSegmentTimeMs = Long.MAX_VALUE; for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) { if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) { minSegmentTimeMs = Math.min(minSegmentTimeMs, segmentZKMetadata.getStartTimeMs()); @@ -193,29 +191,28 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTaskConfig taskConfig, @Nullable SegmentPartitionConfig partitionConfig) { - return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) - .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) - .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) - .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) - .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) - .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()) - .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) - .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build(); + return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled()) + .setSegmentPartitionConfig(partitionConfig).build(); } protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String tableName, TableTaskConfig taskConfig) { AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; - return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) - .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) - .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) - .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) - .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) - .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()) - .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()) - .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build(); + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()) + .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()) + .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()) + .setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs()) + .setNullHandlingEnabled(getNullHandlingEnabled()).build(); } - @Test public void testRealtimeToOfflineSegmentsTask() throws Exception { @@ -234,12 +231,12 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC long expectedWatermark = _dataSmallestTimeMs + 86400000; for (int i = 0; i < 3; i++) { // Schedule task - assertNotNull(_taskManager.scheduleTasks(_realtimeTableName) + assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); assertTrue(_taskResourceManager.getTaskQueues().contains( PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE))); // Should not generate more tasks - assertNull(_taskManager.scheduleTasks(_realtimeTableName) + assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); // Wait at most 600 seconds for all tasks COMPLETED @@ -286,12 +283,12 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC _taskManager.cleanUpTask(); for (int i = 0; i < 3; i++) { // Schedule task - assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName) + assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); assertTrue(_taskResourceManager.getTaskQueues().contains( PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE))); // Should not generate more tasks - assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName) + assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null) .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); // Wait at most 600 seconds for all tasks COMPLETED diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 241c1c0876..78aa4d1c24 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -87,8 +87,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { properties.put(TASK_TYPE + MinionConstants.MAX_ATTEMPTS_PER_TASK_KEY_SUFFIX, "2"); helixResourceManager.getHelixAdmin().setConfig( - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) - .forCluster(helixResourceManager.getHelixClusterName()).build(), properties); + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + helixResourceManager.getHelixClusterName()).build(), properties); // Add 3 offline tables, where 2 of them have TestTask enabled addDummySchema(TABLE_NAME_1); @@ -136,7 +136,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0); // Should create the task queues and generate a task in the same minion instance - List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE); + List<String> task1 = _taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE); assertNotNull(task1); assertEquals(task1.size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() @@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { verifyTaskCount(task1.get(0), 0, 1, 1, 2); // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait // since we have one minion instance that is still running one of the sub-tasks. - List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null); + List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, null); assertNotNull(task2); assertEquals(task2.size(), 1); assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0))); @@ -159,8 +159,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2. // Our test task generator does not generate if there are already this many sub-tasks in the // running+waiting count already. - assertNull(_taskManager.scheduleTasks().get(TASK_TYPE)); - assertNull(_taskManager.scheduleTask(TASK_TYPE, null)); + assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE)); + assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null)); // Wait at most 60 seconds for all tasks IN_PROGRESS TestUtils.waitForCondition(input -> { @@ -183,13 +183,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS; String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED; String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED; - TestUtils.waitForCondition( - input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) + TestUtils.waitForCondition(input -> + MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) - == 0, - ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Stop the task queue _helixTaskResourceManager.stopTaskQueue(TASK_TYPE); @@ -211,14 +210,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED"); // Wait at most 30 seconds for ZK callback to update the controller gauges - TestUtils.waitForCondition( - input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) - == 0 + TestUtils.waitForCondition(input -> + MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) - == 0, - ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Task deletion requires the task queue to be stopped, // so deleting task1 here before resuming the task queue. @@ -247,13 +244,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED"); // Wait at most 30 seconds for ZK callback to update the controller gauges - TestUtils.waitForCondition( - input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) - == 0 + TestUtils.waitForCondition(input -> + MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) - == (NUM_TASKS - 1), - ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) == ( + NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Delete the task queue _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false); @@ -263,13 +258,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue"); // Wait at most 30 seconds for ZK callback to update the controller gauges - TestUtils.waitForCondition( - input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) - == 0 + TestUtils.waitForCondition(input -> + MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) - == 0, - ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); } @AfterClass diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java index d292ef4c9b..5058fd4b75 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java @@ -489,7 +489,7 @@ public class TlsIntegrationTest extends BaseClusterIntegrationTest { Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0); // schedule offline segment generation - Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks()); + Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null)); // wait for offline segments JsonNode offlineSegments = TestUtils.waitForResult(() -> { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java index 238d515b54..19c3ac61ff 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java @@ -471,8 +471,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { waitForAllDocsLoaded(tableName, 600_000L, 1000); assertEquals(getScore(tableName), 3692); waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3); - - assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)) + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); waitForAllDocsLoaded(tableName, 600_000L, 3); @@ -501,8 +501,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { waitForAllDocsLoaded(tableName, 600_000L, 2000); assertEquals(getScore(tableName), 3692); waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5); - - assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)) + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); waitForAllDocsLoaded(tableName, 600_000L, 3); @@ -546,7 +546,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { // Run segment compaction. This time, we expect that the deleting rows are still there because they are // as part of the consuming segment - assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)) + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); waitForAllDocsLoaded(tableName, 600_000L, 3); @@ -563,7 +564,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { assertEquals(getNumDeletedRows(tableName), 2); // Run segment compaction. This time, we expect that the deleting rows are cleaned up - assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)) + realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null) .get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); waitForTaskToComplete(); waitForAllDocsLoaded(tableName, 600_000L, 3); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java index e8389b377f..08aa9aee6a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java @@ -203,7 +203,7 @@ public class UrlAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest { Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0); // schedule offline segment generation - Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks()); + Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null)); // wait for offline segments JsonNode offlineSegments = TestUtils.waitForResult(() -> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org