This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 862a931667 Improve minion observer stats to capture more granular 
stages of minion task execution (#15118)
862a931667 is described below

commit 862a9316670ccbf48f33af5ce780469d84dd5440
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Tue Mar 4 09:36:30 2025 +0530

    Improve minion observer stats to capture more granular stages of minion 
task execution (#15118)
    
    * Add more stats to MinionTaskBaseObserverStats for better tracking
    
    * Add granular tracking to SegmentProcessorFramework
    
    * lint fixes
    
    * Add minion API to get the subtask stats
    
    * test fix
    
    * fixes
    
    * add comments
    
    * Add comments on _stageTimes
---
 .../framework/SegmentProcessorFramework.java       |  35 +++++--
 .../segment/processing/mapper/SegmentMapper.java   |  10 +-
 .../api/resources/PinotTaskProgressResource.java   |  39 +++++++
 .../pinot/minion/event/MinionProgressObserver.java |  73 +++++++++++--
 .../spi/tasks/MinionTaskBaseObserverStats.java     | 115 +++++++++++++++++++--
 .../spi/tasks/MinionTaskBaseObserverStatsTest.java |  43 +++++---
 .../resources/observer_stats_test_payload.json     |  26 +++++
 7 files changed, 302 insertions(+), 39 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index a6d3f74042..596eb3266e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +60,9 @@ import org.slf4j.LoggerFactory;
  */
 public class SegmentProcessorFramework {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentProcessorFramework.class);
+  public static final String MAP_STAGE = "MAP";
+  public static final String REDUCE_STAGE = "REDUCE";
+  public static final String GENERATE_STAGE = "GENERATE_SEGMENT";
 
   private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final List<RecordTransformer> _customRecordTransformers;
@@ -157,9 +161,9 @@ public class SegmentProcessorFramework {
     int numRecordReaders = _recordReaderFileConfigs.size();
     int nextRecordReaderIndexToBeProcessed = 0;
     int iterationCount = 1;
-    Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
     boolean isMapperOutputSizeThresholdEnabled =
         
_segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold() 
!= Long.MAX_VALUE;
+    String logMessage;
 
     while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
       // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
@@ -168,34 +172,42 @@ public class SegmentProcessorFramework {
 
       // Log start of iteration details only if intermediate file size 
threshold is set.
       if (isMapperOutputSizeThresholdEnabled) {
-        String logMessage =
+        logMessage =
             String.format("Starting iteration %d with %d record readers. 
Starting index = %d, end index = %d",
                 iterationCount,
                 
_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, 
numRecordReaders).size(),
                 nextRecordReaderIndexToBeProcessed + 1, numRecordReaders);
         LOGGER.info(logMessage);
-        observer.accept(logMessage);
+        logToObserver(MAP_STAGE, logMessage);
       }
 
       // Map phase.
       long mapStartTimeInMs = System.currentTimeMillis();
+      logToObserver(MAP_STAGE, "Starting Map phase for iteration " + 
iterationCount);
       Map<String, GenericRowFileManager> partitionToFileManagerMap = 
mapper.map();
 
       // Log the time taken to map.
-      LOGGER.info("Finished iteration {} in {}ms", iterationCount, 
System.currentTimeMillis() - mapStartTimeInMs);
+      logMessage = "Finished Map phase for iteration " + iterationCount + " in 
"
+          + (System.currentTimeMillis() - mapStartTimeInMs) + "ms";
+      LOGGER.info(logMessage);
+      logToObserver(MAP_STAGE, logMessage);
 
       // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
       // iteration.
       if (partitionToFileManagerMap.isEmpty()) {
-        LOGGER.info("No mapper output files generated, skipping reduce phase");
+        logMessage = "No mapper output files generated, skipping reduce phase";
+        LOGGER.info(logMessage);
+        logToObserver(MAP_STAGE, logMessage);
         nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
         continue;
       }
 
       // Reduce phase.
+      logToObserver(REDUCE_STAGE, "Starting Reduce phase for iteration " + 
iterationCount);
       doReduce(partitionToFileManagerMap);
 
       // Segment creation phase. Add the created segments to the final list.
+      logToObserver(GENERATE_STAGE, "Generating segments for iteration " + 
iterationCount);
       outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
 
       // Store the starting index of the record readers that were processed in 
this iteration for logging purposes.
@@ -213,7 +225,6 @@ public class SegmentProcessorFramework {
 
         // We are sure that the last RecordReader is completely processed in 
the last iteration else it may or may not
         // have completed processing. Log it accordingly.
-        String logMessage;
         if (nextRecordReaderIndexToBeProcessed == numRecordReaders) {
           logMessage = String.format("Finished processing all of %d 
RecordReaders", numRecordReaders);
         } else {
@@ -222,16 +233,22 @@ public class SegmentProcessorFramework {
                   + "iteration %d", startingProcessedRecordReaderIndex + 1, 
boundaryIndexToLog,
               nextRecordReaderIndexToBeProcessed + 1, numRecordReaders, 
iterationCount);
         }
-
-        observer.accept(logMessage);
         LOGGER.info(logMessage);
+        logToObserver(GENERATE_STAGE, logMessage);
       }
-
       iterationCount++;
     }
     return outputSegmentDirs;
   }
 
+  private void logToObserver(String stage, String logMessage) {
+    _segmentProcessorConfig.getProgressObserver()
+        .accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
+            .withStage(stage)
+            .withStatus(logMessage)
+            .build());
+  }
+
   protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig> 
recordReaderFileConfigs) {
     if (_transformPipeline != null) {
       return new SegmentMapper(recordReaderFileConfigs, _transformPipeline, 
_segmentProcessorConfig, _mapperOutputDir);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 3ec0a539c7..2697846f79 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -187,10 +188,15 @@ public class SegmentMapper {
           writeRecord(transformedRow);
         }
       } catch (Exception e) {
+        String logMessage = "Caught exception while reading data.";
+        observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
+            .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
+            .withStatus(logMessage + " Reason: " + e.getMessage())
+            .build());
         if (!continueOnError) {
-          throw new RuntimeException("Caught exception while reading data", e);
+          throw new RuntimeException(logMessage, e);
         } else {
-          LOGGER.debug("Caught exception while reading data", e);
+          LOGGER.debug(logMessage, e);
           continue;
         }
       }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
index ac91835c76..95a9a8efce 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
@@ -45,6 +45,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.event.MinionTaskState;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
@@ -143,4 +144,42 @@ public class PinotTaskProgressResource {
           .build());
     }
   }
+
+  @GET
+  @Path("/tasks/subtask/progressStats")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Get task progress stats tracked for the given subtasks")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public String getSubtaskProgressStats(
+      @ApiParam(value = "Sub task name") @QueryParam("subtaskName") String 
subtaskName) {
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Getting progress stats for subtask: {}", subtaskName);
+      }
+      Map<String, MinionTaskBaseObserverStats> progressStatsMap = new 
HashMap<>();
+      MinionEventObserver observer = 
MinionEventObservers.getInstance().getMinionEventObserver(subtaskName);
+      if (observer == null) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("MinionEventObserver does not exist for subtask: {}", 
subtaskName);
+        }
+        return JsonUtils.objectToString(progressStatsMap);
+      }
+      MinionTaskBaseObserverStats progressStats = observer.getProgressStats();
+      if (progressStats != null) {
+        progressStats.setProgressLogs(null);
+        progressStatsMap.put(subtaskName, progressStats);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Got subtasks progress stats: {}", progressStats);
+        }
+      }
+      return JsonUtils.objectToString(progressStatsMap);
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(
+          String.format("Failed to get task progress stats for subtask: %s due 
to error: %s",
+              subtaskName, e.getMessage()))
+          .build());
+    }
+  }
 }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index a68011207c..7f3b11870f 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
 public class MinionProgressObserver extends DefaultMinionEventObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionProgressObserver.class);
 
-  protected MinionTaskBaseObserverStats _taskProgressStats = new 
MinionTaskBaseObserverStats();
+  protected final MinionTaskBaseObserverStats _taskProgressStats = new 
MinionTaskBaseObserverStats();
   protected String _taskId;
 
   @Override
@@ -50,6 +51,7 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
     addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
         .withTs(_taskProgressStats.getStartTimestamp())
         .withStatus("Task started")
+        .withStage(MinionTaskState.IN_PROGRESS.name())
         .build());
     super.notifyTaskStart(pinotTaskConfig);
   }
@@ -62,14 +64,29 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
    */
   @Override
   public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
-    }
+    String progressMessage = null;
+    MinionTaskBaseObserverStats.StatusEntry statusEntry = null;
     _taskProgressStats.setCurrentState(MinionTaskState.IN_PROGRESS.name());
-    addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
-        .withTs(System.currentTimeMillis())
-        .withStatus((progress == null) ? "" : progress.toString())
-        .build());
+    if (progress instanceof MinionTaskBaseObserverStats.StatusEntry) {
+      statusEntry = (MinionTaskBaseObserverStats.StatusEntry) progress;
+      progressMessage = statusEntry.getStatus();
+    } else if (progress instanceof MinionTaskBaseObserverStats) {
+      MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) 
progress;
+      // Only one progress log must be recorded at once and should not be 
bulked
+      if (stats.getProgressLogs() != null) {
+        statusEntry = stats.getProgressLogs().pollFirst();
+        progressMessage = statusEntry != null ? statusEntry.getStatus() : null;
+      }
+    } else if (progress != null) {
+      progressMessage = progress.toString();
+      statusEntry = new MinionTaskBaseObserverStats.StatusEntry.Builder()
+          .withStatus(progressMessage)
+          .build();
+    }
+    if (LOGGER.isDebugEnabled() && progressMessage != null) {
+      LOGGER.debug("Update progress: {} for task: {}", progressMessage, 
pinotTaskConfig.getTaskId());
+    }
+    addStatus(statusEntry);
     super.notifyProgress(pinotTaskConfig, progress);
   }
 
@@ -93,9 +110,11 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
   public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, 
@Nullable Object executionResult) {
     long endTs = System.currentTimeMillis();
     _taskProgressStats.setCurrentState(MinionTaskState.SUCCEEDED.name());
+    _taskProgressStats.setEndTimestamp(endTs);
     addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
         .withTs(endTs)
         .withStatus("Task succeeded in " + (endTs - 
_taskProgressStats.getStartTimestamp()) + "ms")
+        .withStage(MinionTaskState.SUCCEEDED.name())
         .build());
     super.notifyTaskSuccess(pinotTaskConfig, executionResult);
   }
@@ -104,10 +123,12 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
   public synchronized void notifyTaskCancelled(PinotTaskConfig 
pinotTaskConfig) {
     long endTs = System.currentTimeMillis();
     _taskProgressStats.setCurrentState(MinionTaskState.CANCELLED.name());
+    _taskProgressStats.setEndTimestamp(endTs);
     addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
         .withTs(endTs)
         .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.WARN)
         .withStatus("Task got cancelled after " + (endTs - 
_taskProgressStats.getStartTimestamp()) + "ms")
+        .withStage(MinionTaskState.CANCELLED.name())
         .build());
     super.notifyTaskCancelled(pinotTaskConfig);
   }
@@ -116,11 +137,13 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
   public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, 
Exception e) {
     long endTs = System.currentTimeMillis();
     _taskProgressStats.setCurrentState(MinionTaskState.ERROR.name());
+    _taskProgressStats.setEndTimestamp(endTs);
     addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
         .withTs(endTs)
         .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
         .withStatus("Task failed in " + (endTs - 
_taskProgressStats.getStartTimestamp()) + "ms with error: "
             + ExceptionUtils.getStackTrace(e))
+        .withStage(MinionTaskState.ERROR.name())
         .build());
     super.notifyTaskError(pinotTaskConfig, e);
   }
@@ -139,6 +162,37 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
   }
 
   private synchronized void addStatus(MinionTaskBaseObserverStats.StatusEntry 
statusEntry) {
+    if (statusEntry == null) {
+      // if no status entry provided, only update the task progress if the 
_taskProgressStats has updated values
+      MinionTaskBaseObserverStats minionTaskObserverStats = 
_observerStorageManager.getTaskProgress(_taskId);
+      if (minionTaskObserverStats != null && 
!minionTaskObserverStats.equals(_taskProgressStats)) {
+        _observerStorageManager.setTaskProgress(_taskId, new 
MinionTaskBaseObserverStats(_taskProgressStats));
+      }
+      return;
+    }
+    String incomingStage = statusEntry.getStage();
+    if (_taskProgressStats.getCurrentStage() == null) {
+      // typically incomingStage won't be null when current stage is also null 
as notifyTaskStart is the first
+      // that gets called during task execution.
+      // This handling is mostly for testing purpose
+      _taskProgressStats.setCurrentStage(incomingStage != null ? incomingStage 
: MinionTaskState.UNKNOWN.name());
+    }
+    String currentStage = _taskProgressStats.getCurrentStage();
+    Map<String, MinionTaskBaseObserverStats.Timer> stageTimes = 
_taskProgressStats.getStageTimes();
+    if (incomingStage != null && !currentStage.equals(incomingStage)) {
+      // stage transition
+      stageTimes.get(currentStage).stop();
+      currentStage = incomingStage;
+      _taskProgressStats.setCurrentStage(currentStage);
+    } else {
+      // carry forward current stage if stage not specified
+      statusEntry.updateStage(currentStage);
+    }
+    if (!stageTimes.containsKey(currentStage)) {
+      stageTimes.put(currentStage, new MinionTaskBaseObserverStats.Timer());
+      stageTimes.get(currentStage).start();
+    }
+
     MinionTaskBaseObserverStats minionTaskObserverStats = 
_observerStorageManager.getTaskProgress(_taskId);
     Deque<MinionTaskBaseObserverStats.StatusEntry> progressLogs;
     if (minionTaskObserverStats != null) {
@@ -146,8 +200,7 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
     } else {
       progressLogs = new LinkedList<>();
     }
-    progressLogs.add(statusEntry);
-
+    progressLogs.offer(statusEntry);
     _observerStorageManager.setTaskProgress(_taskId, new 
MinionTaskBaseObserverStats(_taskProgressStats)
         .setProgressLogs(progressLogs));
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
index d21b35fb1c..37580cc08c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pinot.spi.tasks;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -34,8 +38,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 public class MinionTaskBaseObserverStats {
   protected String _taskId;
+  protected String _currentStage;
   protected String _currentState;
   protected long _startTimestamp;
+  protected long _endTimestamp;
+  // A map to keep track of the time spent on each stage of a task execution
+  // This stat will be managed by the observer and executor should not worry 
about maintaining it
+  protected Map<String, Timer> _stageTimes = new HashMap<>();
   protected Deque<StatusEntry> _progressLogs = new LinkedList<>();
 
   public MinionTaskBaseObserverStats() {
@@ -44,7 +53,10 @@ public class MinionTaskBaseObserverStats {
   public MinionTaskBaseObserverStats(MinionTaskBaseObserverStats from) {
     _taskId = from.getTaskId();
     _currentState = from.getCurrentState();
+    _currentStage = from.getCurrentStage();
     _startTimestamp = from.getStartTimestamp();
+    _endTimestamp = from.getEndTimestamp();
+    _stageTimes = from.getStageTimes();
     _progressLogs = new LinkedList<>(from.getProgressLogs());
   }
 
@@ -66,6 +78,15 @@ public class MinionTaskBaseObserverStats {
     return this;
   }
 
+  public long getEndTimestamp() {
+    return _endTimestamp;
+  }
+
+  public MinionTaskBaseObserverStats setEndTimestamp(long endTimestamp) {
+    _endTimestamp = endTimestamp;
+    return this;
+  }
+
   public String getCurrentState() {
     return _currentState;
   }
@@ -75,6 +96,24 @@ public class MinionTaskBaseObserverStats {
     return this;
   }
 
+  public String getCurrentStage() {
+    return _currentStage;
+  }
+
+  public MinionTaskBaseObserverStats setCurrentStage(String currentStage) {
+    _currentStage = currentStage;
+    return this;
+  }
+
+  public Map<String, Timer> getStageTimes() {
+    return _stageTimes;
+  }
+
+  public MinionTaskBaseObserverStats setStageTimes(Map<String, Timer> 
stageTimes) {
+    _stageTimes = stageTimes;
+    return this;
+  }
+
   public Deque<StatusEntry> getProgressLogs() {
     return _progressLogs;
   }
@@ -103,13 +142,56 @@ public class MinionTaskBaseObserverStats {
       return false;
     }
     MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) o;
-    return _startTimestamp == stats.getStartTimestamp() && 
_taskId.equals(stats.getTaskId())
-        && _currentState.equals(stats.getCurrentState());
+    return _startTimestamp == stats.getStartTimestamp() && _endTimestamp == 
stats.getEndTimestamp()
+        && _taskId.equals(stats.getTaskId()) && 
_currentState.equals(stats.getCurrentState())
+        && _currentStage.equals(stats.getCurrentStage());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(_taskId, _currentState, _startTimestamp);
+    return Objects.hash(_taskId, _currentStage, _currentState, 
_startTimestamp, _endTimestamp);
+  }
+
+  public static class Timer {
+    private long _totalTimeMs = 0;
+    private long _startTimeMs = 0;
+    private long _resumeTimeMs = 0;
+
+    public Timer() {
+    }
+
+    public Timer(@JsonProperty("totalTimeMs") long totalTimeMs,
+        @JsonProperty("startTimeMs") long startTimeMs,
+        @JsonProperty("resumeTimeMs") long resumeTimeMs) {
+      _totalTimeMs = totalTimeMs;
+      _startTimeMs = startTimeMs;
+      _resumeTimeMs = resumeTimeMs;
+    }
+
+    public void start() {
+      _startTimeMs = System.currentTimeMillis();
+      _resumeTimeMs = _startTimeMs;
+    }
+
+    public void stop() {
+      if (_resumeTimeMs != 0) {
+        _totalTimeMs += System.currentTimeMillis() - _resumeTimeMs;
+        _resumeTimeMs = 0;
+      }
+    }
+
+    public long getStartTimeMs() {
+      return _startTimeMs;
+    }
+
+    public long getTotalTimeMs() {
+      return _totalTimeMs;
+    }
+
+    @JsonIgnore
+    public long getResumeTimeMs() {
+      return _resumeTimeMs;
+    }
   }
 
   @JsonDeserialize(builder = StatusEntry.Builder.class)
@@ -117,11 +199,13 @@ public class MinionTaskBaseObserverStats {
     private final long _ts;
     private final LogLevel _level;
     private final String _status;
+    private String _stage;
 
-    public StatusEntry(long ts, LogLevel level, String status) {
+    private StatusEntry(long ts, LogLevel level, String status, String stage) {
       _ts = ts;
       _level = level != null ? level : LogLevel.INFO;
       _status = status;
+      _stage = stage;
     }
 
     public long getTs() {
@@ -136,14 +220,28 @@ public class MinionTaskBaseObserverStats {
       return _level;
     }
 
+    public String getStage() {
+      return _stage;
+    }
+
+    public boolean updateStage(String stage) {
+      if (_stage == null) {
+        _stage = stage;
+        return true;
+      }
+      return false;
+    }
+
     @Override
     public String toString() {
-      return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\", 
\"status\" : \"" + _status + "\"}";
+      return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\", \"stage\" 
: \"" + _stage
+          + "\", \"status\" : \"" + _status + "\"}";
     }
 
     public static class Builder {
       private long _ts;
       private LogLevel _level = LogLevel.INFO;
+      private String _stage;
       private String _status;
 
       public Builder withTs(long ts) {
@@ -156,6 +254,11 @@ public class MinionTaskBaseObserverStats {
         return this;
       }
 
+      public Builder withStage(String stage) {
+        _stage = stage;
+        return this;
+      }
+
       public Builder withStatus(String status) {
         _status = status;
         return this;
@@ -165,7 +268,7 @@ public class MinionTaskBaseObserverStats {
         if (_ts == 0) {
           _ts = System.currentTimeMillis();
         }
-        return new StatusEntry(_ts, _level, _status);
+        return new StatusEntry(_ts, _level, _status, _stage);
       }
     }
 
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
index 126e9d43e3..f96dc6dfb7 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
@@ -19,7 +19,13 @@
 package org.apache.pinot.spi.tasks;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -29,28 +35,41 @@ public class MinionTaskBaseObserverStatsTest {
   private static final String TEST_PROPERTY = "some test property";
   private static final String TASK_ID = "randomString";
   private static final String CURRENT_STATE = "IN_PROGRESS";
-  private static final long TS = System.currentTimeMillis();
+  private static final String CURRENT_STAGE = "testStage";
+  private static final long TS = 1740407875728L;
   private static final String STATUS = "task status";
 
   @Test
   public void testSerDeser()
-      throws JsonProcessingException {
+      throws Exception {
+    Map<String, MinionTaskBaseObserverStats.Timer> stageTimes = new 
HashMap<>();
+    stageTimes.put(CURRENT_STATE, new MinionTaskBaseObserverStats.Timer());
+    stageTimes.put(CURRENT_STAGE, new MinionTaskBaseObserverStats.Timer());
     TestObserverStats stats = (TestObserverStats) new TestObserverStats()
         .setTestProperty(TEST_PROPERTY)
         .setTaskId(TASK_ID)
         .setCurrentState(CURRENT_STATE)
-        .setStartTimestamp(TS);
-    stats.getProgressLogs().offer(new MinionTaskBaseObserverStats.StatusEntry(
-        TS, MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO, STATUS));
+        .setCurrentStage(CURRENT_STAGE)
+        .setStartTimestamp(TS)
+        .setEndTimestamp(TS)
+        .setStageTimes(stageTimes);
+    stats.getProgressLogs().offer(new 
MinionTaskBaseObserverStats.StatusEntry.Builder()
+            .withTs(TS)
+            .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO)
+            .withStatus(STATUS)
+            .withStage("test")
+        .build());
     String statsString = getTestObjectString();
     TestObserverStats stats2 = stats.fromJsonString(statsString);
     Assert.assertEquals(stats2, stats);
   }
 
-  private String getTestObjectString() {
-    return "{\"testProperty\":\"" + TEST_PROPERTY + "\",\"startTimestamp\":" + 
TS
-        + ",\"currentState\":\"" + CURRENT_STATE + 
"\",\"progressLogs\":[{\"level\":\"INFO\",\"status\":\"" + STATUS
-        + "\",\"ts\":" + TS + "}],\"taskId\":\"" + TASK_ID + "\"}";
+  private String getTestObjectString()
+      throws Exception {
+    URL resource = MinionTaskBaseObserverStatsTest.class.getClassLoader()
+        .getResource("observer_stats_test_payload.json");
+    Assert.assertNotNull(resource);
+    return FileUtils.readFileToString(new File(resource.toURI()), 
StandardCharsets.UTF_8);
   }
 
   public static class TestObserverStats extends MinionTaskBaseObserverStats {
@@ -74,8 +93,8 @@ public class MinionTaskBaseObserverStatsTest {
     @Override
     public boolean equals(Object o) {
       if (this == o) {
-      return true;
-    }
+        return true;
+      }
       if (o == null || getClass() != o.getClass()) {
         return false;
       }
@@ -85,7 +104,7 @@ public class MinionTaskBaseObserverStatsTest {
 
     @Override
     public int hashCode() {
-      return Objects.hash(getTaskId(), getCurrentState(), getStartTimestamp(), 
getTestProperty());
+      return Objects.hash(super.hashCode(), _testProperty);
     }
   }
 }
diff --git a/pinot-spi/src/test/resources/observer_stats_test_payload.json 
b/pinot-spi/src/test/resources/observer_stats_test_payload.json
new file mode 100644
index 0000000000..fcf968a903
--- /dev/null
+++ b/pinot-spi/src/test/resources/observer_stats_test_payload.json
@@ -0,0 +1,26 @@
+{
+  "testProperty": "some test property",
+  "currentState": "IN_PROGRESS",
+  "startTimestamp": 1740407875728,
+  "endTimestamp": 1740407875728,
+  "currentStage": "testStage",
+  "stageTimes": {
+    "IN_PROGRESS": {
+      "startTimeMs": 0,
+      "totalTimeMs": 0
+    },
+    "testStage": {
+      "startTimeMs": 0,
+      "totalTimeMs": 0
+    }
+  },
+  "progressLogs": [
+    {
+      "ts": 1740407875728,
+      "stage": "test",
+      "status": "task status",
+      "level": "INFO"
+    }
+  ],
+  "taskId": "randomString"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to