This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2ea0185 Enhance task schedule api for single type/table support (#6352) 2ea0185 is described below commit 2ea01854866482fe8206a55dec4f2f1b2a648874 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Dec 17 10:35:22 2020 -0800 Enhance task schedule api for single type/table support (#6352) Added 2 optional query parameters to the task schedule API to schedule tasks for the given task type/table. Examples: - POST /tasks/schedule?taskType=MyTask - POST /tasks/schedule?tableName=myTable_OFFLINE - POST /tasks/schedule?taskType=MyTask&tableName=myTable_OFFLINE --- .../apache/pinot/common/minion/MinionClient.java | 17 ++- .../common/minion/MinionRequestURLBuilder.java | 14 +- .../pinot/common/minion/MinionClientTest.java | 2 +- .../api/resources/PinotTaskRestletResource.java | 16 +- .../helix/core/minion/PinotTaskManager.java | 169 ++++++++++++++------- ...vertToRawIndexMinionClusterIntegrationTest.java | 6 +- ...fflineSegmentsMinionClusterIntegrationTest.java | 6 +- .../tests/SimpleMinionClusterIntegrationTest.java | 17 +-- 8 files changed, 157 insertions(+), 90 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java index beb98d5..dffb841 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.common.minion; +import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.httpclient.HttpException; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; @@ -30,8 +32,6 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; import org.apache.pinot.spi.utils.JsonUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private @InterfaceStability.Evolving public class MinionClient { - - private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class); private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build(); private static final String ACCEPT = "accept"; private static final String APPLICATION_JSON = "application/json"; @@ -67,9 +65,10 @@ public class MinionClient { return _controllerUrl; } - public Map<String, String> scheduleMinionTasks() + public Map<String, String> scheduleMinionTasks(@Nullable String taskType, @Nullable String tableNameWithType) throws IOException { - HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule()); + HttpPost httpPost = createHttpPostRequest( + MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule(taskType, tableNameWithType)); HttpResponse response = HTTP_CLIENT.execute(httpPost); int statusCode = response.getStatusLine().getStatusCode(); final String responseString = IOUtils.toString(response.getEntity().getContent()); @@ -77,7 +76,8 @@ public class MinionClient { throw new HttpException(String .format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString)); } - return JsonUtils.stringToObject(responseString, Map.class); + return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() { + }); } public Map<String, String> getTasksStates(String taskType) @@ -91,7 +91,8 @@ public class MinionClient { throw new HttpException(String .format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString)); } - return JsonUtils.stringToObject(responseString, Map.class); + return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() { + }); } public String getTaskState(String taskName) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java index 4432bc9..01e8f2f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.minion; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.StringUtil; @@ -37,8 +38,17 @@ public class MinionRequestURLBuilder { return new MinionRequestURLBuilder(baseUrl); } - public String forTaskSchedule() { - return StringUtil.join("/", _baseUrl, "tasks/schedule"); + public String forTaskSchedule(@Nullable String taskType, @Nullable String tableNameWithType) { + String url = StringUtil.join("/", _baseUrl, "tasks/schedule"); + if (taskType != null && tableNameWithType != null) { + return url + "?taskType=" + taskType + "&tableName=" + tableNameWithType; + } else if (taskType != null) { + return url + "?taskType=" + taskType; + } else if (tableNameWithType != null) { + return url + "?tableName=" + tableNameWithType; + } else { + return url; + } } public String forListAllTasks(String taskType) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java index 869dde1..eddb888 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java @@ -63,7 +63,7 @@ public class MinionClientTest { createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}", 0)); MinionClient minionClient = new MinionClient("localhost", "14202"); - Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"), + Assert.assertEquals(minionClient.scheduleMinionTasks(null, null).get("SegmentGenerationAndPushTask"), "Task_SegmentGenerationAndPushTask_1607470525615"); httpServer.stop(0); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index 745ff30..aa3235e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -168,9 +169,18 @@ public class PinotTaskRestletResource { @POST @Path("/tasks/schedule") - @ApiOperation("Schedule tasks") - public Map<String, String> scheduleTasks() { - return _pinotTaskManager.scheduleTasks(); + @ApiOperation("Schedule tasks and return a map from task type to task name scheduled") + public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType, + @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) { + if (taskType != null) { + // Schedule task for the given task type + String taskName = tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName) + : _pinotTaskManager.scheduleTask(taskType); + return Collections.singletonMap(taskType, taskName); + } else { + // Schedule tasks for all task types + return tableName != null ? _pinotTaskManager.scheduleTasks(tableName) : _pinotTaskManager.scheduleTasks(); + } } @Deprecated diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 7a17718..fe97326 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.controller.helix.core.minion; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; @@ -33,7 +36,6 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableTaskConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,11 +65,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } /** - * Returns the cluster info provider. - * <p> - * Cluster info provider might be useful when initializing task generators. - * - * @return Cluster info provider + * Returns the cluster info accessor. + * <p>Cluster info accessor can be used to initialize the task generator. */ public ClusterInfoAccessor getClusterInfoAccessor() { return _clusterInfoAccessor; @@ -75,87 +74,139 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { /** * Registers a task generator. - * <p> - * This method can be used to plug in custom task generators. - * - * @param pinotTaskGenerator Task generator to be registered + * <p>This method can be used to plug in custom task generators. */ - public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) { - _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator); + public void registerTaskGenerator(PinotTaskGenerator taskGenerator) { + _taskGeneratorRegistry.registerTaskGenerator(taskGenerator); } /** - * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader. + * Public API to schedule tasks (all task types) for all tables. It might be called from the non-leader controller. + * Returns a map from the task type to the task scheduled. */ public synchronized Map<String, String> scheduleTasks() { - Map<String, String> tasksScheduled = scheduleTasks(_pinotHelixResourceManager.getAllTables()); - - // Reset the task because this method will be called from the Rest API instead of the periodic task scheduler - // TODO: Clean up only the non-leader tables instead of all tables - cleanUpTask(); - setUpTask(); - - return tasksScheduled; + return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false); } /** - * Check the Pinot cluster status and schedule new tasks for the given tables. - * - * @param tableNamesWithType List of table names with type suffix - * @return Map from task type to task scheduled + * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map + * from the task type to the task scheduled. */ - private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType) { + private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); - Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes(); - int numTaskTypes = taskTypes.size(); - Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(numTaskTypes); - - for (String taskType : taskTypes) { - enabledTableConfigMap.put(taskType, new ArrayList<>()); - - // Ensure all task queues exist - _helixTaskResourceManager.ensureTaskQueueExists(taskType); - } - // Scan all table configs to get the tables with tasks enabled + Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(); for (String tableNameWithType : tableNamesWithType) { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - if (tableConfig != null) { - TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - if (taskConfig != null) { - for (String taskType : taskTypes) { - if (taskConfig.isTaskTypeEnabled(taskType)) { - enabledTableConfigMap.get(taskType).add(tableConfig); - } - } + if (tableConfig != null && tableConfig.getTaskConfig() != null) { + Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet(); + for (String enabledTaskType : enabledTaskTypes) { + enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList<>()).add(tableConfig); } } } // Generate each type of tasks - Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes); - for (String taskType : taskTypes) { - LOGGER.info("Generating tasks for task type: {}", taskType); - PinotTaskGenerator pinotTaskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); - List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType)); - int numTasks = pinotTaskConfigs.size(); - if (numTasks > 0) { - LOGGER - .info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs); - tasksScheduled.put(taskType, _helixTaskResourceManager - .submitTask(pinotTaskConfigs, pinotTaskGenerator.getTaskTimeoutMs(), - pinotTaskGenerator.getNumConcurrentTasksPerInstance())); - _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks); + Map<String, String> tasksScheduled = new HashMap<>(); + for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) { + String taskType = entry.getKey(); + List<TableConfig> enabledTableConfigs = entry.getValue(); + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + if (taskGenerator != null) { + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader)); + } else { + List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size()); + for (TableConfig enabledTableConfig : enabledTableConfigs) { + enabledTables.add(enabledTableConfig.getTableName()); + } + LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables); + tasksScheduled.put(taskType, null); } } return tasksScheduled; } + /** + * Helper method to schedule task with the given task generator for the given tables that have the task enabled. + * Returns the task name, or {@code null} if no task is scheduled. + */ + @Nullable + private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, + boolean isLeader) { + List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs); + if (!isLeader) { + taskGenerator.nonLeaderCleanUp(); + } + int numTasks = pinotTaskConfigs.size(); + if (numTasks > 0) { + String taskType = taskGenerator.getTaskType(); + LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs); + _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks); + return _helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(), + taskGenerator.getNumConcurrentTasksPerInstance()); + } else { + return null; + } + } + + /** + * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader + * controller. Returns a map from the task type to the task scheduled. + */ + @Nullable + public synchronized Map<String, String> scheduleTasks(String tableNameWithType) { + return scheduleTasks(Collections.singletonList(tableNameWithType), false); + } + + /** + * Public API to schedule task for the given task type. It might be called from the non-leader controller. Returns the + * task name, or {@code null} if no task is scheduled. + */ + @Nullable + public synchronized String scheduleTask(String taskType) { + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); + + // Scan all table configs to get the tables with task enabled + List<TableConfig> enabledTableConfigs = new ArrayList<>(); + for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig() + .isTaskTypeEnabled(taskType)) { + enabledTableConfigs.add(tableConfig); + } + } + + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + return scheduleTask(taskGenerator, enabledTableConfigs, false); + } + + /** + * Public API to schedule task for the given task type on the given table. It might be called from the non-leader + * controller. Returns the task name, or {@code null} if no task is scheduled. + */ + @Nullable + public synchronized String scheduleTask(String taskType, String tableNameWithType) { + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + + Preconditions + .checkState(tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType), + "Table: %s does not have task type: %s enabled", tableNameWithType, taskType); + + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false); + } + @Override protected void processTables(List<String> tableNamesWithType) { - scheduleTasks(tableNamesWithType); + scheduleTasks(tableNamesWithType, true); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java index 9fa9519..6848da8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java @@ -109,15 +109,15 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster } // Should create the task queues and generate a ConvertToRawIndexTask task with 5 child tasks - Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE)); + Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); Assert.assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE))); // Should generate one more ConvertToRawIndexTask task with 3 child tasks - Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE)); + Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); // Should not generate more tasks - Assert.assertFalse(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE)); + Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE)); // Wait at most 600 seconds for all tasks COMPLETED and new segments refreshed TestUtils.waitForCondition(input -> { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index 3f80e95..6079bfd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -106,13 +106,11 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt long offlineSegmentTime = _dateSmallestDays; for (int i = 0; i < 3; i++) { // Schedule task - Assert.assertTrue( - _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + Assert.assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains( PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE))); // Should not generate more tasks - Assert.assertFalse( - _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); + Assert.assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)); expectedWatermark = expectedWatermark + 86400000; // Wait at most 600 seconds for all tasks COMPLETED diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 5232b7a..e0a4ecc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -48,10 +48,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; /** @@ -109,15 +106,16 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { HOLD.set(true); // Should create the task queues and generate a task - assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE)); + assertNotNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TestTaskGenerator.TASK_TYPE))); // Should generate one more task - assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE)); + assertNotNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE)); // Should not generate more tasks - assertFalse(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE)); + assertNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE)); + assertNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE)); // Wait at most 60 seconds for all tasks IN_PROGRESS TestUtils.waitForCondition(input -> { @@ -178,9 +176,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { _helixTaskResourceManager.deleteTaskQueue(TestTaskGenerator.TASK_TYPE, false); // Wait at most 60 seconds for task queue to be deleted - TestUtils.waitForCondition(input -> { - return !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE); - }, STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue"); + TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE), + STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue"); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org