This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 862a931667 Improve minion observer stats to capture more granular stages of minion task execution (#15118) 862a931667 is described below commit 862a9316670ccbf48f33af5ce780469d84dd5440 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Tue Mar 4 09:36:30 2025 +0530 Improve minion observer stats to capture more granular stages of minion task execution (#15118) * Add more stats to MinionTaskBaseObserverStats for better tracking * Add granular tracking to SegmentProcessorFramework * lint fixes * Add minion API to get the subtask stats * test fix * fixes * add comments * Add comments on _stageTimes --- .../framework/SegmentProcessorFramework.java | 35 +++++-- .../segment/processing/mapper/SegmentMapper.java | 10 +- .../api/resources/PinotTaskProgressResource.java | 39 +++++++ .../pinot/minion/event/MinionProgressObserver.java | 73 +++++++++++-- .../spi/tasks/MinionTaskBaseObserverStats.java | 115 +++++++++++++++++++-- .../spi/tasks/MinionTaskBaseObserverStatsTest.java | 43 +++++--- .../resources/observer_stats_test_payload.json | 26 +++++ 7 files changed, 302 insertions(+), 39 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index a6d3f74042..596eb3266e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -43,6 +43,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; import org.apache.pinot.spi.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +60,9 @@ import org.slf4j.LoggerFactory; */ public class SegmentProcessorFramework { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class); + public static final String MAP_STAGE = "MAP"; + public static final String REDUCE_STAGE = "REDUCE"; + public static final String GENERATE_STAGE = "GENERATE_SEGMENT"; private final List<RecordReaderFileConfig> _recordReaderFileConfigs; private final List<RecordTransformer> _customRecordTransformers; @@ -157,9 +161,9 @@ public class SegmentProcessorFramework { int numRecordReaders = _recordReaderFileConfigs.size(); int nextRecordReaderIndexToBeProcessed = 0; int iterationCount = 1; - Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver(); boolean isMapperOutputSizeThresholdEnabled = _segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold() != Long.MAX_VALUE; + String logMessage; while (nextRecordReaderIndexToBeProcessed < numRecordReaders) { // Initialise the mapper. Eliminate the record readers that have been processed in the previous iterations. @@ -168,34 +172,42 @@ public class SegmentProcessorFramework { // Log start of iteration details only if intermediate file size threshold is set. if (isMapperOutputSizeThresholdEnabled) { - String logMessage = + logMessage = String.format("Starting iteration %d with %d record readers. Starting index = %d, end index = %d", iterationCount, _recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders).size(), nextRecordReaderIndexToBeProcessed + 1, numRecordReaders); LOGGER.info(logMessage); - observer.accept(logMessage); + logToObserver(MAP_STAGE, logMessage); } // Map phase. long mapStartTimeInMs = System.currentTimeMillis(); + logToObserver(MAP_STAGE, "Starting Map phase for iteration " + iterationCount); Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map(); // Log the time taken to map. - LOGGER.info("Finished iteration {} in {}ms", iterationCount, System.currentTimeMillis() - mapStartTimeInMs); + logMessage = "Finished Map phase for iteration " + iterationCount + " in " + + (System.currentTimeMillis() - mapStartTimeInMs) + "ms"; + LOGGER.info(logMessage); + logToObserver(MAP_STAGE, logMessage); // Check for mapper output files, if no files are generated, skip the reducer phase and move on to the next // iteration. if (partitionToFileManagerMap.isEmpty()) { - LOGGER.info("No mapper output files generated, skipping reduce phase"); + logMessage = "No mapper output files generated, skipping reduce phase"; + LOGGER.info(logMessage); + logToObserver(MAP_STAGE, logMessage); nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); continue; } // Reduce phase. + logToObserver(REDUCE_STAGE, "Starting Reduce phase for iteration " + iterationCount); doReduce(partitionToFileManagerMap); // Segment creation phase. Add the created segments to the final list. + logToObserver(GENERATE_STAGE, "Generating segments for iteration " + iterationCount); outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap)); // Store the starting index of the record readers that were processed in this iteration for logging purposes. @@ -213,7 +225,6 @@ public class SegmentProcessorFramework { // We are sure that the last RecordReader is completely processed in the last iteration else it may or may not // have completed processing. Log it accordingly. - String logMessage; if (nextRecordReaderIndexToBeProcessed == numRecordReaders) { logMessage = String.format("Finished processing all of %d RecordReaders", numRecordReaders); } else { @@ -222,16 +233,22 @@ public class SegmentProcessorFramework { + "iteration %d", startingProcessedRecordReaderIndex + 1, boundaryIndexToLog, nextRecordReaderIndexToBeProcessed + 1, numRecordReaders, iterationCount); } - - observer.accept(logMessage); LOGGER.info(logMessage); + logToObserver(GENERATE_STAGE, logMessage); } - iterationCount++; } return outputSegmentDirs; } + private void logToObserver(String stage, String logMessage) { + _segmentProcessorConfig.getProgressObserver() + .accept(new MinionTaskBaseObserverStats.StatusEntry.Builder() + .withStage(stage) + .withStatus(logMessage) + .build()); + } + protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs) { if (_transformPipeline != null) { return new SegmentMapper(recordReaderFileConfigs, _transformPipeline, _segmentProcessorConfig, _mapperOutputDir); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 3ec0a539c7..2697846f79 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; import org.apache.pinot.spi.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,10 +188,15 @@ public class SegmentMapper { writeRecord(transformedRow); } } catch (Exception e) { + String logMessage = "Caught exception while reading data."; + observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder() + .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR) + .withStatus(logMessage + " Reason: " + e.getMessage()) + .build()); if (!continueOnError) { - throw new RuntimeException("Caught exception while reading data", e); + throw new RuntimeException(logMessage, e); } else { - LOGGER.debug("Caught exception while reading data", e); + LOGGER.debug(logMessage, e); continue; } } diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java index ac91835c76..95a9a8efce 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java @@ -45,6 +45,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.minion.event.MinionEventObserver; import org.apache.pinot.minion.event.MinionEventObservers; import org.apache.pinot.minion.event.MinionTaskState; +import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; @@ -143,4 +144,42 @@ public class PinotTaskProgressResource { .build()); } } + + @GET + @Path("/tasks/subtask/progressStats") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation("Get task progress stats tracked for the given subtasks") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public String getSubtaskProgressStats( + @ApiParam(value = "Sub task name") @QueryParam("subtaskName") String subtaskName) { + try { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Getting progress stats for subtask: {}", subtaskName); + } + Map<String, MinionTaskBaseObserverStats> progressStatsMap = new HashMap<>(); + MinionEventObserver observer = MinionEventObservers.getInstance().getMinionEventObserver(subtaskName); + if (observer == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("MinionEventObserver does not exist for subtask: {}", subtaskName); + } + return JsonUtils.objectToString(progressStatsMap); + } + MinionTaskBaseObserverStats progressStats = observer.getProgressStats(); + if (progressStats != null) { + progressStats.setProgressLogs(null); + progressStatsMap.put(subtaskName, progressStats); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Got subtasks progress stats: {}", progressStats); + } + } + return JsonUtils.objectToString(progressStatsMap); + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to get task progress stats for subtask: %s due to error: %s", + subtaskName, e.getMessage())) + .build()); + } + } } diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java index a68011207c..7f3b11870f 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory; public class MinionProgressObserver extends DefaultMinionEventObserver { private static final Logger LOGGER = LoggerFactory.getLogger(MinionProgressObserver.class); - protected MinionTaskBaseObserverStats _taskProgressStats = new MinionTaskBaseObserverStats(); + protected final MinionTaskBaseObserverStats _taskProgressStats = new MinionTaskBaseObserverStats(); protected String _taskId; @Override @@ -50,6 +51,7 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder() .withTs(_taskProgressStats.getStartTimestamp()) .withStatus("Task started") + .withStage(MinionTaskState.IN_PROGRESS.name()) .build()); super.notifyTaskStart(pinotTaskConfig); } @@ -62,14 +64,29 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { */ @Override public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object progress) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Update progress: {} for task: {}", progress, pinotTaskConfig.getTaskId()); - } + String progressMessage = null; + MinionTaskBaseObserverStats.StatusEntry statusEntry = null; _taskProgressStats.setCurrentState(MinionTaskState.IN_PROGRESS.name()); - addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder() - .withTs(System.currentTimeMillis()) - .withStatus((progress == null) ? "" : progress.toString()) - .build()); + if (progress instanceof MinionTaskBaseObserverStats.StatusEntry) { + statusEntry = (MinionTaskBaseObserverStats.StatusEntry) progress; + progressMessage = statusEntry.getStatus(); + } else if (progress instanceof MinionTaskBaseObserverStats) { + MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) progress; + // Only one progress log must be recorded at once and should not be bulked + if (stats.getProgressLogs() != null) { + statusEntry = stats.getProgressLogs().pollFirst(); + progressMessage = statusEntry != null ? statusEntry.getStatus() : null; + } + } else if (progress != null) { + progressMessage = progress.toString(); + statusEntry = new MinionTaskBaseObserverStats.StatusEntry.Builder() + .withStatus(progressMessage) + .build(); + } + if (LOGGER.isDebugEnabled() && progressMessage != null) { + LOGGER.debug("Update progress: {} for task: {}", progressMessage, pinotTaskConfig.getTaskId()); + } + addStatus(statusEntry); super.notifyProgress(pinotTaskConfig, progress); } @@ -93,9 +110,11 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) { long endTs = System.currentTimeMillis(); _taskProgressStats.setCurrentState(MinionTaskState.SUCCEEDED.name()); + _taskProgressStats.setEndTimestamp(endTs); addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder() .withTs(endTs) .withStatus("Task succeeded in " + (endTs - _taskProgressStats.getStartTimestamp()) + "ms") + .withStage(MinionTaskState.SUCCEEDED.name()) .build()); super.notifyTaskSuccess(pinotTaskConfig, executionResult); } @@ -104,10 +123,12 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { public synchronized void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) { long endTs = System.currentTimeMillis(); _taskProgressStats.setCurrentState(MinionTaskState.CANCELLED.name()); + _taskProgressStats.setEndTimestamp(endTs); addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder() .withTs(endTs) .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.WARN) .withStatus("Task got cancelled after " + (endTs - _taskProgressStats.getStartTimestamp()) + "ms") + .withStage(MinionTaskState.CANCELLED.name()) .build()); super.notifyTaskCancelled(pinotTaskConfig); } @@ -116,11 +137,13 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) { long endTs = System.currentTimeMillis(); _taskProgressStats.setCurrentState(MinionTaskState.ERROR.name()); + _taskProgressStats.setEndTimestamp(endTs); addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder() .withTs(endTs) .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR) .withStatus("Task failed in " + (endTs - _taskProgressStats.getStartTimestamp()) + "ms with error: " + ExceptionUtils.getStackTrace(e)) + .withStage(MinionTaskState.ERROR.name()) .build()); super.notifyTaskError(pinotTaskConfig, e); } @@ -139,6 +162,37 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { } private synchronized void addStatus(MinionTaskBaseObserverStats.StatusEntry statusEntry) { + if (statusEntry == null) { + // if no status entry provided, only update the task progress if the _taskProgressStats has updated values + MinionTaskBaseObserverStats minionTaskObserverStats = _observerStorageManager.getTaskProgress(_taskId); + if (minionTaskObserverStats != null && !minionTaskObserverStats.equals(_taskProgressStats)) { + _observerStorageManager.setTaskProgress(_taskId, new MinionTaskBaseObserverStats(_taskProgressStats)); + } + return; + } + String incomingStage = statusEntry.getStage(); + if (_taskProgressStats.getCurrentStage() == null) { + // typically incomingStage won't be null when current stage is also null as notifyTaskStart is the first + // that gets called during task execution. + // This handling is mostly for testing purpose + _taskProgressStats.setCurrentStage(incomingStage != null ? incomingStage : MinionTaskState.UNKNOWN.name()); + } + String currentStage = _taskProgressStats.getCurrentStage(); + Map<String, MinionTaskBaseObserverStats.Timer> stageTimes = _taskProgressStats.getStageTimes(); + if (incomingStage != null && !currentStage.equals(incomingStage)) { + // stage transition + stageTimes.get(currentStage).stop(); + currentStage = incomingStage; + _taskProgressStats.setCurrentStage(currentStage); + } else { + // carry forward current stage if stage not specified + statusEntry.updateStage(currentStage); + } + if (!stageTimes.containsKey(currentStage)) { + stageTimes.put(currentStage, new MinionTaskBaseObserverStats.Timer()); + stageTimes.get(currentStage).start(); + } + MinionTaskBaseObserverStats minionTaskObserverStats = _observerStorageManager.getTaskProgress(_taskId); Deque<MinionTaskBaseObserverStats.StatusEntry> progressLogs; if (minionTaskObserverStats != null) { @@ -146,8 +200,7 @@ public class MinionProgressObserver extends DefaultMinionEventObserver { } else { progressLogs = new LinkedList<>(); } - progressLogs.add(statusEntry); - + progressLogs.offer(statusEntry); _observerStorageManager.setTaskProgress(_taskId, new MinionTaskBaseObserverStats(_taskProgressStats) .setProgressLogs(progressLogs)); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java index d21b35fb1c..37580cc08c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java @@ -18,10 +18,14 @@ */ package org.apache.pinot.spi.tasks; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.util.Deque; +import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Objects; import org.apache.pinot.spi.utils.JsonUtils; @@ -34,8 +38,13 @@ import org.apache.pinot.spi.utils.JsonUtils; */ public class MinionTaskBaseObserverStats { protected String _taskId; + protected String _currentStage; protected String _currentState; protected long _startTimestamp; + protected long _endTimestamp; + // A map to keep track of the time spent on each stage of a task execution + // This stat will be managed by the observer and executor should not worry about maintaining it + protected Map<String, Timer> _stageTimes = new HashMap<>(); protected Deque<StatusEntry> _progressLogs = new LinkedList<>(); public MinionTaskBaseObserverStats() { @@ -44,7 +53,10 @@ public class MinionTaskBaseObserverStats { public MinionTaskBaseObserverStats(MinionTaskBaseObserverStats from) { _taskId = from.getTaskId(); _currentState = from.getCurrentState(); + _currentStage = from.getCurrentStage(); _startTimestamp = from.getStartTimestamp(); + _endTimestamp = from.getEndTimestamp(); + _stageTimes = from.getStageTimes(); _progressLogs = new LinkedList<>(from.getProgressLogs()); } @@ -66,6 +78,15 @@ public class MinionTaskBaseObserverStats { return this; } + public long getEndTimestamp() { + return _endTimestamp; + } + + public MinionTaskBaseObserverStats setEndTimestamp(long endTimestamp) { + _endTimestamp = endTimestamp; + return this; + } + public String getCurrentState() { return _currentState; } @@ -75,6 +96,24 @@ public class MinionTaskBaseObserverStats { return this; } + public String getCurrentStage() { + return _currentStage; + } + + public MinionTaskBaseObserverStats setCurrentStage(String currentStage) { + _currentStage = currentStage; + return this; + } + + public Map<String, Timer> getStageTimes() { + return _stageTimes; + } + + public MinionTaskBaseObserverStats setStageTimes(Map<String, Timer> stageTimes) { + _stageTimes = stageTimes; + return this; + } + public Deque<StatusEntry> getProgressLogs() { return _progressLogs; } @@ -103,13 +142,56 @@ public class MinionTaskBaseObserverStats { return false; } MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) o; - return _startTimestamp == stats.getStartTimestamp() && _taskId.equals(stats.getTaskId()) - && _currentState.equals(stats.getCurrentState()); + return _startTimestamp == stats.getStartTimestamp() && _endTimestamp == stats.getEndTimestamp() + && _taskId.equals(stats.getTaskId()) && _currentState.equals(stats.getCurrentState()) + && _currentStage.equals(stats.getCurrentStage()); } @Override public int hashCode() { - return Objects.hash(_taskId, _currentState, _startTimestamp); + return Objects.hash(_taskId, _currentStage, _currentState, _startTimestamp, _endTimestamp); + } + + public static class Timer { + private long _totalTimeMs = 0; + private long _startTimeMs = 0; + private long _resumeTimeMs = 0; + + public Timer() { + } + + public Timer(@JsonProperty("totalTimeMs") long totalTimeMs, + @JsonProperty("startTimeMs") long startTimeMs, + @JsonProperty("resumeTimeMs") long resumeTimeMs) { + _totalTimeMs = totalTimeMs; + _startTimeMs = startTimeMs; + _resumeTimeMs = resumeTimeMs; + } + + public void start() { + _startTimeMs = System.currentTimeMillis(); + _resumeTimeMs = _startTimeMs; + } + + public void stop() { + if (_resumeTimeMs != 0) { + _totalTimeMs += System.currentTimeMillis() - _resumeTimeMs; + _resumeTimeMs = 0; + } + } + + public long getStartTimeMs() { + return _startTimeMs; + } + + public long getTotalTimeMs() { + return _totalTimeMs; + } + + @JsonIgnore + public long getResumeTimeMs() { + return _resumeTimeMs; + } } @JsonDeserialize(builder = StatusEntry.Builder.class) @@ -117,11 +199,13 @@ public class MinionTaskBaseObserverStats { private final long _ts; private final LogLevel _level; private final String _status; + private String _stage; - public StatusEntry(long ts, LogLevel level, String status) { + private StatusEntry(long ts, LogLevel level, String status, String stage) { _ts = ts; _level = level != null ? level : LogLevel.INFO; _status = status; + _stage = stage; } public long getTs() { @@ -136,14 +220,28 @@ public class MinionTaskBaseObserverStats { return _level; } + public String getStage() { + return _stage; + } + + public boolean updateStage(String stage) { + if (_stage == null) { + _stage = stage; + return true; + } + return false; + } + @Override public String toString() { - return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\", \"status\" : \"" + _status + "\"}"; + return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\", \"stage\" : \"" + _stage + + "\", \"status\" : \"" + _status + "\"}"; } public static class Builder { private long _ts; private LogLevel _level = LogLevel.INFO; + private String _stage; private String _status; public Builder withTs(long ts) { @@ -156,6 +254,11 @@ public class MinionTaskBaseObserverStats { return this; } + public Builder withStage(String stage) { + _stage = stage; + return this; + } + public Builder withStatus(String status) { _status = status; return this; @@ -165,7 +268,7 @@ public class MinionTaskBaseObserverStats { if (_ts == 0) { _ts = System.currentTimeMillis(); } - return new StatusEntry(_ts, _level, _status); + return new StatusEntry(_ts, _level, _status, _stage); } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java index 126e9d43e3..f96dc6dfb7 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java @@ -19,7 +19,13 @@ package org.apache.pinot.spi.tasks; import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.File; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.utils.JsonUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -29,28 +35,41 @@ public class MinionTaskBaseObserverStatsTest { private static final String TEST_PROPERTY = "some test property"; private static final String TASK_ID = "randomString"; private static final String CURRENT_STATE = "IN_PROGRESS"; - private static final long TS = System.currentTimeMillis(); + private static final String CURRENT_STAGE = "testStage"; + private static final long TS = 1740407875728L; private static final String STATUS = "task status"; @Test public void testSerDeser() - throws JsonProcessingException { + throws Exception { + Map<String, MinionTaskBaseObserverStats.Timer> stageTimes = new HashMap<>(); + stageTimes.put(CURRENT_STATE, new MinionTaskBaseObserverStats.Timer()); + stageTimes.put(CURRENT_STAGE, new MinionTaskBaseObserverStats.Timer()); TestObserverStats stats = (TestObserverStats) new TestObserverStats() .setTestProperty(TEST_PROPERTY) .setTaskId(TASK_ID) .setCurrentState(CURRENT_STATE) - .setStartTimestamp(TS); - stats.getProgressLogs().offer(new MinionTaskBaseObserverStats.StatusEntry( - TS, MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO, STATUS)); + .setCurrentStage(CURRENT_STAGE) + .setStartTimestamp(TS) + .setEndTimestamp(TS) + .setStageTimes(stageTimes); + stats.getProgressLogs().offer(new MinionTaskBaseObserverStats.StatusEntry.Builder() + .withTs(TS) + .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO) + .withStatus(STATUS) + .withStage("test") + .build()); String statsString = getTestObjectString(); TestObserverStats stats2 = stats.fromJsonString(statsString); Assert.assertEquals(stats2, stats); } - private String getTestObjectString() { - return "{\"testProperty\":\"" + TEST_PROPERTY + "\",\"startTimestamp\":" + TS - + ",\"currentState\":\"" + CURRENT_STATE + "\",\"progressLogs\":[{\"level\":\"INFO\",\"status\":\"" + STATUS - + "\",\"ts\":" + TS + "}],\"taskId\":\"" + TASK_ID + "\"}"; + private String getTestObjectString() + throws Exception { + URL resource = MinionTaskBaseObserverStatsTest.class.getClassLoader() + .getResource("observer_stats_test_payload.json"); + Assert.assertNotNull(resource); + return FileUtils.readFileToString(new File(resource.toURI()), StandardCharsets.UTF_8); } public static class TestObserverStats extends MinionTaskBaseObserverStats { @@ -74,8 +93,8 @@ public class MinionTaskBaseObserverStatsTest { @Override public boolean equals(Object o) { if (this == o) { - return true; - } + return true; + } if (o == null || getClass() != o.getClass()) { return false; } @@ -85,7 +104,7 @@ public class MinionTaskBaseObserverStatsTest { @Override public int hashCode() { - return Objects.hash(getTaskId(), getCurrentState(), getStartTimestamp(), getTestProperty()); + return Objects.hash(super.hashCode(), _testProperty); } } } diff --git a/pinot-spi/src/test/resources/observer_stats_test_payload.json b/pinot-spi/src/test/resources/observer_stats_test_payload.json new file mode 100644 index 0000000000..fcf968a903 --- /dev/null +++ b/pinot-spi/src/test/resources/observer_stats_test_payload.json @@ -0,0 +1,26 @@ +{ + "testProperty": "some test property", + "currentState": "IN_PROGRESS", + "startTimestamp": 1740407875728, + "endTimestamp": 1740407875728, + "currentStage": "testStage", + "stageTimes": { + "IN_PROGRESS": { + "startTimeMs": 0, + "totalTimeMs": 0 + }, + "testStage": { + "startTimeMs": 0, + "totalTimeMs": 0 + } + }, + "progressLogs": [ + { + "ts": 1740407875728, + "stage": "test", + "status": "task status", + "level": "INFO" + } + ], + "taskId": "randomString" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org