rajagopr commented on code in PR #15118:
URL: https://github.com/apache/pinot/pull/15118#discussion_r1968637124


##########
pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java:
##########
@@ -143,4 +144,36 @@ public String getSubtaskProgress(
           .build());
     }
   }
+
+  @GET
+  @Path("/tasks/subtask/progressStats")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Get task progress stats tracked for the given subtasks")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public String getSubtaskProgressStats(
+      @ApiParam(value = "Sub task name") @QueryParam("subtaskName") String 
subtaskName) {
+    try {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Getting progress stats for subtask: {}", subtaskName);
+      }
+      Map<String, MinionTaskBaseObserverStats> progressStatsMap = new 
HashMap<>();
+      MinionEventObserver observer = 
MinionEventObservers.getInstance().getMinionEventObserver(subtaskName);
+      MinionTaskBaseObserverStats progressStats = observer.getProgressStats();

Review Comment:
   Won't this produce an NPE if observer is `null`?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java:
##########
@@ -103,25 +140,70 @@ public boolean equals(Object o) {
       return false;
     }
     MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) o;
-    return _startTimestamp == stats.getStartTimestamp() && 
_taskId.equals(stats.getTaskId())
-        && _currentState.equals(stats.getCurrentState());
+    return _startTimestamp == stats.getStartTimestamp() && _endTimestamp == 
stats.getEndTimestamp()
+        && _taskId.equals(stats.getTaskId()) && 
_currentState.equals(stats.getCurrentState())
+        && _currentStage.equals(stats.getCurrentStage());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(_taskId, _currentState, _startTimestamp);
+    return Objects.hash(_taskId, _currentStage, _currentState, 
_startTimestamp, _endTimestamp);
+  }
+
+  public static class Timer {
+    private long _totalTimeMs = 0;
+    private long _startTimeMs = 0;
+    private long _resumeTimeMs = 0;
+
+    public Timer() {
+    }
+
+    public Timer(@JsonProperty("totalTimeMs") long totalTimeMs,
+        @JsonProperty("startTimeMs") long startTimeMs,
+        @JsonProperty("resumeTimeMs") long resumeTimeMs) {
+      _totalTimeMs = totalTimeMs;
+      _startTimeMs = startTimeMs;
+      _resumeTimeMs = resumeTimeMs;
+    }
+
+    public void start() {
+      _startTimeMs = System.currentTimeMillis();
+      _resumeTimeMs = _startTimeMs;

Review Comment:
   What's the real use of the field `_resumeTimeMs`? Do we expect the same 
stage to started more than once?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -139,15 +162,42 @@ public long getStartTs() {
   }
 
   private synchronized void addStatus(MinionTaskBaseObserverStats.StatusEntry 
statusEntry) {
+    if (statusEntry == null) {
+      // if no status entry provided, only update the task progress if the 
_taskProgressStats has updated values
+      MinionTaskBaseObserverStats minionTaskObserverStats = 
_observerStorageManager.getTaskProgress(_taskId);
+      if (minionTaskObserverStats != null && 
!minionTaskObserverStats.equals(_taskProgressStats)) {
+        _observerStorageManager.setTaskProgress(_taskId, new 
MinionTaskBaseObserverStats(_taskProgressStats));
+      }
+      return;
+    }
+    String incomingStage = statusEntry.getStage();
+    if (_taskProgressStats.getCurrentStage() == null) {
+      _taskProgressStats.setCurrentStage(incomingStage != null ? incomingStage 
: MinionTaskState.UNKNOWN.name());

Review Comment:
   Add comment on conditions when the incoming stage would be null.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -187,10 +188,15 @@ protected boolean completeMapAndTransformRow(RecordReader 
recordReader, GenericR
           writeRecord(transformedRow);
         }
       } catch (Exception e) {
+        String logMessage = "Caught exception while reading data.";
+        observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
+            .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
+            .withStatus(logMessage + " Reason : " + e.getMessage())

Review Comment:
   nit: The space between `Reason` and `:` could be avoided. `Reason: the error 
reason`.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to