KKcorps commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2493135613
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -441,24 +441,63 @@ public synchronized Map<String, TaskCount>
getTableTaskCount(String taskName) {
* @return Set of task names
*/
public synchronized Set<String> getTasksInProgress(String taskType) {
- WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+ return getTasksInProgressAndRecent(taskType, 0);
+ }
+
+ /**
+ * Returns a set of Task names (in the form
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
+ * yet, and optionally includes recent tasks that started after a given
timestamp.
+ * NOTE: For tasks just submitted without the context created, count them as
NOT_STARTED.
+ * This method combines in-progress tasks and recent tasks in a single Helix
call to avoid duplicate calls.
+ *
+ * @param taskType Task type
+ * @param afterTimestampMs If > 0, also include tasks that started after
this timestamp (in milliseconds).
+ * This is used to detect short-lived tasks that
started and completed between cycles.
+ * @return Set of task names that are in-progress, and optionally recent
tasks that started after the timestamp
+ */
+ public synchronized Set<String> getTasksInProgressAndRecent(String taskType,
long afterTimestampMs) {
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(helixJobQueueName);
if (workflowConfig == null) {
return Collections.emptySet();
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
if (helixJobs.isEmpty()) {
return Collections.emptySet();
}
- WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(helixJobQueueName);
if (workflowContext == null) {
- return
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
- } else {
- Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
- return helixJobs.stream().filter(helixJobName -> {
- TaskState taskState = helixJobStates.get(helixJobName);
- return taskState == null || taskState == TaskState.NOT_STARTED ||
taskState == TaskState.IN_PROGRESS;
-
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+ // If no context, return all jobs as in-progress (backward compatible
behavior)
+ Set<String> result = helixJobs.stream()
+ .map(PinotHelixTaskResourceManager::getPinotTaskName)
+ .collect(Collectors.toSet());
+ // If timestamp is specified, we can't filter by start time without
context, so return all
+ return result;
+ }
+
+ Set<String> result = new HashSet<>();
+ Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+ Map<String, Long> jobStartTimes = afterTimestampMs > 0 ?
workflowContext.getJobStartTimes() : null;
Review Comment:
What if `workflowContext.getJobStartTimes()` is null as well here? it can
happen if task queue hasn't been created yet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]