This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 767a149 Add endpoint to check minion task status for a single task. (#7353) 767a149 is described below commit 767a149773a27fb38fadfd1448fd6162dff510ac Author: Ramakrishna Baratam <ramabara...@gmail.com> AuthorDate: Tue Aug 24 10:00:51 2021 -0700 Add endpoint to check minion task status for a single task. (#7353) Add debug endpoint to get minion task information for a single task. Add endpoint to get count of sub-tasks for each of the tasks for the given task type. Added DateTimeUtils to pinot common. --- .../common/restlet/resources/SegmentErrorInfo.java | 24 +--- .../apache/pinot/common/utils/DateTimeUtils.java | 47 +++++++ .../api/resources/PinotTaskRestletResource.java | 22 ++- .../core/minion/PinotHelixTaskResourceManager.java | 148 +++++++++++++-------- 4 files changed, 161 insertions(+), 80 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java index 29e7257..5819e5c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentErrorInfo.java @@ -22,11 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; -import java.util.TimeZone; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pinot.common.utils.DateTimeUtils; /** @@ -37,17 +34,10 @@ import org.apache.commons.lang3.exception.ExceptionUtils; @JsonPropertyOrder({"timestamp", "errorMessage", "stackTrace"}) // For readability of JSON output public class SegmentErrorInfo { - private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss z"; - private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT, Locale.getDefault()); - private final String _timestamp; private final String _errorMessage; private final String _stackTrace; - static { - SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault()); - } - /** * This constructor is specifically for JSON ser/de. * @@ -71,7 +61,7 @@ public class SegmentErrorInfo { * @param exception Exception */ public SegmentErrorInfo(long timestampMs, String errorMessage, Exception exception) { - _timestamp = epochToSDF(timestampMs); + _timestamp = DateTimeUtils.epochToDefaultDateFormat(timestampMs); _errorMessage = errorMessage; _stackTrace = (exception != null) ? ExceptionUtils.getStackTrace(exception) : null; } @@ -87,14 +77,4 @@ public class SegmentErrorInfo { public String getTimestamp() { return _timestamp; } - - /** - * Utility function to convert epoch in millis to SDF of form "yyyy-MM-dd HH:mm:ss z". - * - * @param millisSinceEpoch Time in millis to convert - * @return SDF equivalent - */ - private static String epochToSDF(long millisSinceEpoch) { - return SIMPLE_DATE_FORMAT.format(new Date(millisSinceEpoch)); - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java new file mode 100644 index 0000000..bcce064 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DateTimeUtils.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; + + +public class DateTimeUtils { + private DateTimeUtils() { + } + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss z"; + private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT, Locale.getDefault()); + + static { + SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault()); + } + + /** + * Utility function to convert epoch in millis to SDF of form "yyyy-MM-dd HH:mm:ss z". + * + * @param millisSinceEpoch Time in millis to convert + * @return SDF equivalent + */ + public static String epochToDefaultDateFormat(long millisSinceEpoch) { + return SIMPLE_DATE_FORMAT.format(new Date(millisSinceEpoch)); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index 69f075b..a36a41f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -135,13 +135,31 @@ public class PinotTaskRestletResource { } @GET + @Path("/tasks/{taskType}/taskcounts") + @ApiOperation("Fetch count of sub-tasks for each of the tasks for the given task type") + public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts( + @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType) { + return _pinotHelixTaskResourceManager.getTaskCounts(taskType); + } + + @GET @Path("/tasks/{taskType}/debug") @ApiOperation("Fetch information for all the tasks for the given task type") - public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTaskDebugInfo( + public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTasksDebugInfo( @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, @ApiParam(value = "verbosity (By default, prints for running and error tasks. Value of >0 prints for all tasks)") @DefaultValue("0") @QueryParam("verbosity") int verbosity) { - return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskType, verbosity); + return _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType, verbosity); + } + + @GET + @Path("/tasks/task/{taskName}/debug") + @ApiOperation("Fetch information for the given task name") + public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo( + @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName, + @ApiParam(value = "verbosity (By default, prints for running and error tasks. Value of >0 prints for all tasks)") + @DefaultValue("0") @QueryParam("verbosity") int verbosity) { + return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName, verbosity); } @Deprecated diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index 794fedc..e825dbb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -22,16 +22,13 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.TimeZone; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import org.apache.helix.task.JobConfig; @@ -43,6 +40,7 @@ import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; +import org.apache.pinot.common.utils.DateTimeUtils; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.slf4j.Logger; @@ -63,8 +61,6 @@ public class PinotHelixTaskResourceManager { private static final String TASK_QUEUE_PREFIX = "TaskQueue" + TASK_NAME_SEPARATOR; private static final String TASK_PREFIX = "Task" + TASK_NAME_SEPARATOR; - private static final SimpleDateFormat SIMPLE_DATE_FORMAT = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z", Locale.getDefault()); private final TaskDriver _taskDriver; @@ -382,6 +378,26 @@ public class PinotHelixTaskResourceManager { } /** + * Fetch count of sub-tasks for each of the tasks for the given taskType. + * + * @param taskType Pinot taskType / Helix JobQueue + * @return Map of Pinot Task Name to TaskCount + */ + public synchronized Map<String, TaskCount> getTaskCounts(String taskType) { + Map<String, TaskCount> taskCounts = new TreeMap<>(); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { + return taskCounts; + } + Map<String, TaskState> helixJobStates = workflowContext.getJobStates(); + for (String helixJobName : helixJobStates.keySet()) { + String pinotTaskName = getPinotTaskName(helixJobName); + taskCounts.put(pinotTaskName, getTaskCount(pinotTaskName)); + } + return taskCounts; + } + + /** * Given a taskType, helper method to debug all the HelixJobs for the taskType. * For each of the HelixJobs, collects status of the (sub)tasks in the taskbatch. * @@ -390,67 +406,87 @@ public class PinotHelixTaskResourceManager { * If verbosity > 0, shows details for all tasks. * @return Map of Pinot Task Name to TaskDebugInfo. TaskDebugInfo contains details for subtasks. */ - public synchronized Map<String, TaskDebugInfo> getTaskDebugInfo(String taskType, int verbosity) { - Map<String, TaskDebugInfo> taskDebugInfos = new TreeMap<String, TaskDebugInfo>(); + public synchronized Map<String, TaskDebugInfo> getTasksDebugInfo(String taskType, int verbosity) { + Map<String, TaskDebugInfo> taskDebugInfos = new TreeMap<>(); WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); if (workflowContext == null) { return taskDebugInfos; } - boolean showCompleted = verbosity > 0; - SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getDefault()); Map<String, TaskState> helixJobStates = workflowContext.getJobStates(); - for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) { - String helixJobName = entry.getKey(); - String pinotTaskName = getPinotTaskName(helixJobName); - TaskDebugInfo taskDebugInfo = new TaskDebugInfo(); - taskDebugInfo.setTaskState(entry.getValue()); - long jobStartTimeMs = workflowContext.getJobStartTime(helixJobName); - if (jobStartTimeMs > 0) { - taskDebugInfo.setStartTime(SIMPLE_DATE_FORMAT.format(jobStartTimeMs)); + for (String helixJobName : helixJobStates.keySet()) { + taskDebugInfos.put(getPinotTaskName(helixJobName), getTaskDebugInfo(workflowContext, helixJobName, verbosity)); + } + return taskDebugInfos; + } + + /** + * Given a taskName, collects status of the (sub)tasks in the taskName. + * + * @param taskName Pinot taskName + * @param verbosity By default, does not show details for completed tasks. + * If verbosity > 0, shows details for all tasks. + * @return TaskDebugInfo contains details for subtasks in this task batch. + */ + public synchronized TaskDebugInfo getTaskDebugInfo(String taskName, int verbosity) { + String taskType = getTaskType(taskName); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { + return null; + } + String helixJobName = getHelixJobName(taskName); + return getTaskDebugInfo(workflowContext, helixJobName, verbosity); + } + + private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowContext, String helixJobName, + int verbosity) { + boolean showCompleted = verbosity > 0; + TaskDebugInfo taskDebugInfo = new TaskDebugInfo(); + taskDebugInfo.setTaskState(workflowContext.getJobState(helixJobName)); + long jobStartTimeMs = workflowContext.getJobStartTime(helixJobName); + if (jobStartTimeMs > 0) { + taskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(jobStartTimeMs)); + } + JobContext jobContext = _taskDriver.getJobContext(helixJobName); + if (jobContext != null) { + JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName); + long jobExecutionStartTimeMs = jobContext.getExecutionStartTime(); + if (jobExecutionStartTimeMs > 0) { + taskDebugInfo.setExecutionStartTime(DateTimeUtils.epochToDefaultDateFormat(jobExecutionStartTimeMs)); } - JobContext jobContext = _taskDriver.getJobContext(helixJobName); - if (jobContext != null) { - JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName); - long jobExecutionStartTimeMs = jobContext.getExecutionStartTime(); - if (jobExecutionStartTimeMs > 0) { - taskDebugInfo.setExecutionStartTime(SIMPLE_DATE_FORMAT.format(jobExecutionStartTimeMs)); + Set<Integer> partitionSet = jobContext.getPartitionSet(); + TaskCount subtaskCount = new TaskCount(); + for (int partition : partitionSet) { + // First get the partition's state and update the subtaskCount + TaskPartitionState partitionState = jobContext.getPartitionState(partition); + subtaskCount.addTaskState(partitionState); + // Skip details for COMPLETED tasks + if (!showCompleted && partitionState == TaskPartitionState.COMPLETED) { + continue; } - Set<Integer> partitionSet = jobContext.getPartitionSet(); - TaskCount subtaskCount = new TaskCount(); - for (int partition : partitionSet) { - // First get the partition's state and update the subtaskCount - TaskPartitionState partitionState = jobContext.getPartitionState(partition); - subtaskCount.addTaskState(partitionState); - // Skip details for COMPLETED tasks - if (!showCompleted && partitionState == TaskPartitionState.COMPLETED) { - continue; - } - SubtaskDebugInfo subtaskDebugInfo = new SubtaskDebugInfo(); - String taskIdForPartition = jobContext.getTaskIdForPartition(partition); - subtaskDebugInfo.setTaskId(taskIdForPartition); - subtaskDebugInfo.setState(partitionState); - long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition); - if (subtaskStartTimeMs > 0) { - subtaskDebugInfo.setStartTime(SIMPLE_DATE_FORMAT.format(subtaskStartTimeMs)); - } - long subtaskFinishTimeMs = jobContext.getPartitionFinishTime(partition); - if (subtaskFinishTimeMs > 0) { - subtaskDebugInfo.setFinishTime(SIMPLE_DATE_FORMAT.format(subtaskFinishTimeMs)); - } - subtaskDebugInfo.setParticipant(jobContext.getAssignedParticipant(partition)); - subtaskDebugInfo.setInfo(jobContext.getPartitionInfo(partition)); - TaskConfig helixTaskConfig = jobConfig.getTaskConfig(taskIdForPartition); - if (helixTaskConfig != null) { - PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig); - subtaskDebugInfo.setTaskConfig(pinotTaskConfig); - } - taskDebugInfo.addSubtaskInfo(subtaskDebugInfo); + SubtaskDebugInfo subtaskDebugInfo = new SubtaskDebugInfo(); + String taskIdForPartition = jobContext.getTaskIdForPartition(partition); + subtaskDebugInfo.setTaskId(taskIdForPartition); + subtaskDebugInfo.setState(partitionState); + long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition); + if (subtaskStartTimeMs > 0) { + subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs)); + } + long subtaskFinishTimeMs = jobContext.getPartitionFinishTime(partition); + if (subtaskFinishTimeMs > 0) { + subtaskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(subtaskFinishTimeMs)); } - taskDebugInfo.setSubtaskCount(subtaskCount); + subtaskDebugInfo.setParticipant(jobContext.getAssignedParticipant(partition)); + subtaskDebugInfo.setInfo(jobContext.getPartitionInfo(partition)); + TaskConfig helixTaskConfig = jobConfig.getTaskConfig(taskIdForPartition); + if (helixTaskConfig != null) { + PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig); + subtaskDebugInfo.setTaskConfig(pinotTaskConfig); + } + taskDebugInfo.addSubtaskInfo(subtaskDebugInfo); } - taskDebugInfos.put(pinotTaskName, taskDebugInfo); + taskDebugInfo.setSubtaskCount(subtaskCount); } - return taskDebugInfos; + return taskDebugInfo; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org