This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8e21aec1c2 Add endpoints for some finer control on minion tasks (#8486) 8e21aec1c2 is described below commit 8e21aec1c2308b8e6b37f8086a696a72c4e03252 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Fri Apr 8 11:27:46 2022 -0700 Add endpoints for some finer control on minion tasks (#8486) - allow to delete individual tasks w/o deleting the task queue - inspect the states or configs of the subtasks of a task --- .../api/resources/PinotTaskRestletResource.java | 32 +++++++++ .../core/minion/PinotHelixTaskResourceManager.java | 76 ++++++++++++++++++++++ .../tests/SimpleMinionClusterIntegrationTest.java | 18 +++-- 3 files changed, 119 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index a36a41fafe..12b16edca6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import javax.inject.Inject; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -37,6 +38,7 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; +import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; @@ -205,6 +207,14 @@ public class PinotTaskRestletResource { return new StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString()); } + @GET + @Path("/tasks/subtask/{taskName}/state") + @ApiOperation("Get the states of all the sub tasks for the given task") + public Map<String, TaskPartitionState> getSubtaskStates( + @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { + return _pinotHelixTaskResourceManager.getSubtaskStates(taskName); + } + @GET @Path("/tasks/task/{taskName}/config") @ApiOperation("Get the task config (a list of child task configs) for the given task") @@ -222,6 +232,16 @@ public class PinotTaskRestletResource { return _pinotHelixTaskResourceManager.getTaskConfigs(taskName); } + @GET + @Path("/tasks/subtask/{taskName}/config") + @ApiOperation("Get the configs of specified sub tasks for the given task") + public Map<String, PinotTaskConfig> getSubtaskConfigs( + @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, + @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable + String subtaskNames) { + return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames); + } + @GET @Path("/tasks/scheduler/information") @ApiOperation("Fetch cron scheduler information") @@ -428,6 +448,18 @@ public class PinotTaskRestletResource { return new SuccessResponse("Successfully deleted tasks for task type: " + taskType); } + @DELETE + @Path("/tasks/task/{taskName}") + @Authenticate(AccessType.DELETE) + @ApiOperation("Delete a single task given its task name") + public SuccessResponse deleteTask( + @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, + @ApiParam(value = "Whether to force deleting the task (expert only option, enable with cautious") + @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) { + _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete); + return new SuccessResponse("Successfully deleted task: " + taskName); + } + @Deprecated @DELETE @Path("/tasks/taskqueue/{taskType}") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index e825dbbca7..081377899d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -31,6 +32,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang.StringUtils; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; @@ -162,6 +165,25 @@ public class PinotHelixTaskResourceManager { _taskDriver.delete(helixJobQueueName, forceDelete); } + /** + * Delete a single task from the task queue. The task queue should be + * stopped before deleting the task, otherwise it fails with exception. + * + * @param taskName the task to delete from the queue. + * @param forceDelete as said in helix comment, if set true, all job's related zk nodes will + * be clean up from zookeeper even if its workflow information can not be found. + */ + public synchronized void deleteTask(String taskName, boolean forceDelete) { + String taskType = getTaskType(taskName); + String helixJobQueueName = getHelixJobQueueName(taskType); + if (forceDelete) { + LOGGER.warn("Force deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType); + } else { + LOGGER.info("Deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType); + } + _taskDriver.deleteJob(helixJobQueueName, taskName, forceDelete); + } + /** * Get all task queues. * @@ -328,6 +350,33 @@ public class PinotHelixTaskResourceManager { return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName)); } + /** + * Get states of all the sub tasks for a given task. + * + * @param taskName the task whose sub tasks to check + * @return states of all the sub tasks + */ + public synchronized Map<String, TaskPartitionState> getSubtaskStates(String taskName) { + String taskType = getTaskType(taskName); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { + return Collections.emptyMap(); + } + String helixJobName = getHelixJobName(taskName); + JobContext jobContext = _taskDriver.getJobContext(helixJobName); + if (jobContext == null) { + return Collections.emptyMap(); + } + Map<String, TaskPartitionState> subtaskStates = new HashMap<>(); + Set<Integer> partitionSet = jobContext.getPartitionSet(); + for (int partition : partitionSet) { + String taskIdForPartition = jobContext.getTaskIdForPartition(partition); + TaskPartitionState partitionState = jobContext.getPartitionState(partition); + subtaskStates.put(taskIdForPartition, partitionState); + } + return subtaskStates; + } + /** * Get the child task configs for the given task name. * @@ -344,6 +393,33 @@ public class PinotHelixTaskResourceManager { return taskConfigs; } + /** + * Get configs of the specified sub task for a given task. + * + * @param taskName the task whose sub tasks to check + * @param subtaskNames the sub tasks to check + * @return the configs of the sub tasks + */ + public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskName, @Nullable String subtaskNames) { + JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName)); + if (jobConfig == null) { + return Collections.emptyMap(); + } + Map<String, TaskConfig> helixTaskConfigs = jobConfig.getTaskConfigMap(); + Map<String, PinotTaskConfig> taskConfigs = new HashMap<>(helixTaskConfigs.size()); + if (StringUtils.isEmpty(subtaskNames)) { + helixTaskConfigs.forEach((sub, cfg) -> taskConfigs.put(sub, PinotTaskConfig.fromHelixTaskConfig(cfg))); + return taskConfigs; + } + for (String subtaskName : StringUtils.split(subtaskNames, ',')) { + TaskConfig taskConfig = helixTaskConfigs.get(subtaskName); + if (taskConfig != null) { + taskConfigs.put(subtaskName, PinotTaskConfig.fromHelixTaskConfig(taskConfig)); + } + } + return taskConfigs; + } + /** * Helper method to return a map of task names to corresponding task state * where the task corresponds to the given Pinot table name. This is used to diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index d7d1f994f9..b2212bd4db 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -79,8 +79,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Set task timeout in cluster config PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager(); helixResourceManager.getHelixAdmin().setConfig( - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - helixResourceManager.getHelixClusterName()).build(), + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(helixResourceManager.getHelixClusterName()).build(), Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L))); // Add 3 offline tables, where 2 of them have TestTask enabled @@ -106,9 +106,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait for at most 10 seconds for Helix to generate the tasks TestUtils.waitForCondition((aVoid) -> { PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); - return taskCount.getError() == errors - && taskCount.getWaiting() == waiting - && taskCount.getRunning() == running + return taskCount.getError() == errors && taskCount.getWaiting() == waiting && taskCount.getRunning() == running && taskCount.getTotal() == total; }, 10_000L, "Failed to reach expected task count"); } @@ -199,6 +197,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); + // Task deletion requires the task queue to be stopped, + // so deleting task1 here before resuming the task queue. + assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1)); + _helixTaskResourceManager.deleteTask(task1, false); // Resume the task queue, and let the task complete _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE); HOLD.set(false); @@ -206,12 +208,14 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 60 seconds for all tasks COMPLETED TestUtils.waitForCondition(input -> { Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values(); - assertEquals(taskStates.size(), NUM_TASKS); for (TaskState taskState : taskStates) { if (taskState != TaskState.COMPLETED) { return false; } } + // Task deletion happens eventually along with other state transitions. + assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1)); + assertEquals(taskStates.size(), (NUM_TASKS - 1)); assertTrue(TASK_START_NOTIFIED.get()); assertTrue(TASK_SUCCESS_NOTIFIED.get()); assertTrue(TASK_CANCELLED_NOTIFIED.get()); @@ -223,7 +227,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { TestUtils.waitForCondition( input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0 && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS, + && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Delete the task queue --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org