Copilot commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2485515234
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitterTest.java:
##########
@@ -336,4 +336,141 @@ public void removeOldTaskTypeAddNewTaskType() {
oneSingleTaskTypeWithTwoTables();
taskType2WithOneTable();
}
+
+ /**
+ * Test for previously in-progress tasks that completed between runs:
+ * Tasks that were in-progress in the previous run but completed before the
current run
+ * should still have their metrics reported in the current run.
+ *
+ * Scenario:
+ * - Run 1: Task "taskCompletedBetweenRuns" is in-progress with 1 error
subtask
+ * - Run 2: Task "taskCompletedBetweenRuns" has completed and is no longer
in getTasksInProgress()
+ *
+ * Expected: Metrics for "taskCompletedBetweenRuns" should still be emitted
in Run 2 by detecting it via
+ * _previousInProgressTasks tracking. The emitter maintains state of tasks
that were in-progress
+ * in the previous execution cycle and includes completed tasks in the
current cycle's metrics.
+ */
+ @Test
+ public void testReportsPreviouslyInProgressTasksThatCompletedBetweenRuns() {
+ String taskType = "SegmentGenerationAndPushTask";
+ String taskName = "taskCompletedBetweenRuns";
+ String tableName = "testTable_OFFLINE";
+
+
Mockito.when(_pinotHelixTaskResourceManager.getTaskTypes()).thenReturn(ImmutableSet.of(taskType));
+
+ // Run 1: Task is in-progress with 1 error subtask
+ Mockito.when(_pinotHelixTaskResourceManager.getTasksInProgress(taskType))
+ .thenReturn(ImmutableSet.of(taskName));
+
+ // Ensure getTasksStartedAfter returns empty for this test (not relevant
for this scenario)
+ Mockito.when(_pinotHelixTaskResourceManager.getTasksStartedAfter(
+ Mockito.eq(taskType), Mockito.anyLong()))
+ .thenReturn(ImmutableSet.of());
+
+ PinotHelixTaskResourceManager.TaskCount taskCount = new
PinotHelixTaskResourceManager.TaskCount();
+ taskCount.addTaskState(TaskPartitionState.TASK_ERROR);
+ Mockito.when(_pinotHelixTaskResourceManager.getTableTaskCount(taskName))
+ .thenReturn(Map.of(tableName, taskCount));
+
+ _taskMetricsEmitter.runTask(null);
+
+ // Verify metrics were emitted in Run 1
+ PinotMetricsRegistry metricsRegistry =
_controllerMetrics.getMetricsRegistry();
+ Assert.assertEquals(((YammerSettableGauge<?>)
metricsRegistry.allMetrics().get(
+ new YammerMetricName(ControllerMetrics.class,
+ "pinot.controller.numMinionSubtasksError." + taskType))
+ .getMetric()).value(), 1L);
Review Comment:
This metric retrieval pattern with multiple type casts is repeated
identically at lines 379-382, 395-398, 438-441, and 471-474. Extract this into
a test helper method like `getMetricValue(String metricName, String taskType)`
to improve maintainability and reduce duplication.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,71 @@ protected final void runTask(Properties
periodicTaskProperties) {
TaskCount taskTypeAccumulatedCount = new TaskCount();
Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
try {
- Set<String> tasksInProgress =
_helixTaskResourceManager.getTasksInProgress(taskType);
- final int numRunningTasks = tasksInProgress.size();
- for (String task : tasksInProgress) {
- Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
- tableTaskCount.forEach((tableNameWithType, taskCount) -> {
- taskTypeAccumulatedCount.accumulate(taskCount);
- tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
- if (count == null) {
- count = new TaskCount();
- }
- count.accumulate(taskCount);
- return count;
+ // Capture the current execution timestamp for this task type
collection cycle
+ long currentExecutionTimestamp = System.currentTimeMillis();
+ // For task types encountered for the first time, use current time
minus one collection cycle
+ // as the initial timestamp to ensure we don't miss tasks that started
recently
+ long previousExecutionTimestamp =
_previousExecutionTimestamps.computeIfAbsent(taskType,
+ k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+ // Get currently in-progress tasks
+ Set<String> currentInProgressTasks =
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+ // Get tasks that were in-progress during the previous collection cycle
+ Set<String> previouslyInProgressTasks =
+ _previousInProgressTasks.getOrDefault(taskType,
Collections.emptySet());
+
+ // Start with currently in-progress tasks
+ Set<String> tasksToReport = new HashSet<>(currentInProgressTasks);
+
+ // Include tasks that were in-progress previously but are no longer
in-progress
+ // These tasks completed between collection cycles and need their
final metrics reported
+ for (String taskName : previouslyInProgressTasks) {
+ if (!currentInProgressTasks.contains(taskName)) {
+ LOGGER.debug("Including task {} that completed between collection
cycles for taskType: {}",
+ taskName, taskType);
+ tasksToReport.add(taskName);
+ }
+ }
+
+ // Include tasks that started and completed between collection cycles
+ Set<String> tasksStartedAfter =
_helixTaskResourceManager.getTasksStartedAfter(
+ taskType, previousExecutionTimestamp);
+
+ // Filter out tasks we already know about to avoid duplicates
+ Set<String> shortLivedTasks = new HashSet<>();
+ for (String taskName : tasksStartedAfter) {
+ if (!currentInProgressTasks.contains(taskName) &&
!previouslyInProgressTasks.contains(taskName)) {
+ shortLivedTasks.add(taskName);
+ }
+ }
+
+ if (!shortLivedTasks.isEmpty()) {
+ LOGGER.debug("Including {} short-lived tasks that started and
completed between cycles for taskType: {}",
+ shortLivedTasks.size(), taskType);
+ tasksToReport.addAll(shortLivedTasks);
+ }
+
+ final int numRunningTasks = currentInProgressTasks.size();
+
+ // Process all tasks that need metrics reported
+ for (String task : tasksToReport) {
+ try {
+ Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
+ tableTaskCount.forEach((tableNameWithType, taskCount) -> {
+ taskTypeAccumulatedCount.accumulate(taskCount);
+ tableAccumulatedCount.compute(tableNameWithType, (name, count)
-> {
+ if (count == null) {
+ count = new TaskCount();
+ }
+ count.accumulate(taskCount);
+ return count;
+ });
});
- });
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get task count for task: {} of type: {}
(task may have been purged from DAG)",
Review Comment:
The error message references 'DAG' (Directed Acyclic Graph) which is
Helix-specific terminology that may not be immediately clear to all developers
or operators reading logs. Consider clarifying this as 'Helix workflow DAG' or
adding a brief explanation of what it means for a task to be purged from the
DAG.
```suggestion
LOGGER.warn("Failed to get task count for task: {} of type: {}
(task may have been purged from the Helix workflow DAG, i.e., removed from the
workflow graph)",
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,71 @@ protected final void runTask(Properties
periodicTaskProperties) {
TaskCount taskTypeAccumulatedCount = new TaskCount();
Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
try {
- Set<String> tasksInProgress =
_helixTaskResourceManager.getTasksInProgress(taskType);
- final int numRunningTasks = tasksInProgress.size();
- for (String task : tasksInProgress) {
- Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
- tableTaskCount.forEach((tableNameWithType, taskCount) -> {
- taskTypeAccumulatedCount.accumulate(taskCount);
- tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
- if (count == null) {
- count = new TaskCount();
- }
- count.accumulate(taskCount);
- return count;
+ // Capture the current execution timestamp for this task type
collection cycle
+ long currentExecutionTimestamp = System.currentTimeMillis();
+ // For task types encountered for the first time, use current time
minus one collection cycle
+ // as the initial timestamp to ensure we don't miss tasks that started
recently
+ long previousExecutionTimestamp =
_previousExecutionTimestamps.computeIfAbsent(taskType,
+ k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+ // Get currently in-progress tasks
+ Set<String> currentInProgressTasks =
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+ // Get tasks that were in-progress during the previous collection cycle
+ Set<String> previouslyInProgressTasks =
+ _previousInProgressTasks.getOrDefault(taskType,
Collections.emptySet());
+
+ // Start with currently in-progress tasks
+ Set<String> tasksToReport = new HashSet<>(currentInProgressTasks);
+
+ // Include tasks that were in-progress previously but are no longer
in-progress
+ // These tasks completed between collection cycles and need their
final metrics reported
+ for (String taskName : previouslyInProgressTasks) {
+ if (!currentInProgressTasks.contains(taskName)) {
+ LOGGER.debug("Including task {} that completed between collection
cycles for taskType: {}",
+ taskName, taskType);
+ tasksToReport.add(taskName);
+ }
+ }
+
+ // Include tasks that started and completed between collection cycles
+ Set<String> tasksStartedAfter =
_helixTaskResourceManager.getTasksStartedAfter(
+ taskType, previousExecutionTimestamp);
+
+ // Filter out tasks we already know about to avoid duplicates
+ Set<String> shortLivedTasks = new HashSet<>();
+ for (String taskName : tasksStartedAfter) {
+ if (!currentInProgressTasks.contains(taskName) &&
!previouslyInProgressTasks.contains(taskName)) {
+ shortLivedTasks.add(taskName);
+ }
+ }
Review Comment:
This filtering logic duplicates the membership checks against
currentInProgressTasks and previouslyInProgressTasks. Consider using Set
operations (e.g., removeAll) for cleaner and more efficient code: `Set<String>
shortLivedTasks = new HashSet<>(tasksStartedAfter);
shortLivedTasks.removeAll(currentInProgressTasks);
shortLivedTasks.removeAll(previouslyInProgressTasks);`
```suggestion
Set<String> shortLivedTasks = new HashSet<>(tasksStartedAfter);
shortLivedTasks.removeAll(currentInProgressTasks);
shortLivedTasks.removeAll(previouslyInProgressTasks);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]