This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 28aec2e01c Add some additional metrics for Minion tasks (#12710) 28aec2e01c is described below commit 28aec2e01cedfcfaf47e83f41f58f810ac153b31 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Sun Mar 31 18:54:27 2024 -0700 Add some additional metrics for Minion tasks (#12710) * Add some additional metrics for Minion tasks * Address review comments --- .../apache/pinot/common/metrics/MinionMeter.java | 8 +++++- .../apache/pinot/core/minion/SegmentPurger.java | 8 ++++-- .../BaseMultipleSegmentsConversionExecutor.java | 13 ++++++++- .../tasks/BaseSingleSegmentConversionExecutor.java | 22 +++++++++++++-- .../plugin/minion/tasks/BaseTaskExecutor.java | 33 ++++++++++++++++++++++ 5 files changed, 76 insertions(+), 8 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java index 376f86e55e..c85aad39ed 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java @@ -31,7 +31,13 @@ public enum MinionMeter implements AbstractMetrics.Meter { NUMBER_TASKS_FAILED("tasks", false), NUMBER_TASKS_FATAL_FAILED("tasks", false), SEGMENT_UPLOAD_FAIL_COUNT("segments", false), - SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false); + SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false), + SEGMENT_DOWNLOAD_COUNT("segments", false), + SEGMENT_UPLOAD_COUNT("segments", false), + SEGMENT_BYTES_DOWNLOADED("bytes", false), + SEGMENT_BYTES_UPLOADED("bytes", false), + RECORDS_PROCESSED_COUNT("rows", false), + RECORDS_PURGED_COUNT("rows", false); private final String _meterName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java index 4faf695522..2ab65bbe9c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java @@ -70,7 +70,8 @@ public class SegmentPurger { throws Exception { SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); String segmentName = segmentMetadata.getName(); - LOGGER.info("Start purging table: {}, segment: {}", _tableConfig.getTableName(), segmentName); + String tableNameWithType = _tableConfig.getTableName(); + LOGGER.info("Start purging table: {}, segment: {}", tableNameWithType, segmentName); try (PurgeRecordReader purgeRecordReader = new PurgeRecordReader()) { // Make a first pass through the data to see if records need to be purged or modified @@ -107,8 +108,9 @@ public class SegmentPurger { driver.build(); } - LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, modified {} records", - _tableConfig.getTableName(), segmentName, _numRecordsPurged, _numRecordsModified); + LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, modified {} records", tableNameWithType, + segmentName, _numRecordsPurged, _numRecordsModified); + return new File(_workingDir, segmentName); } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 6b439add13..e7ef8a4eea 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -51,6 +51,7 @@ import org.apache.pinot.minion.event.MinionEventObserver; import org.apache.pinot.minion.event.MinionEventObservers; import org.apache.pinot.minion.exception.TaskCancelledException; import org.apache.pinot.segment.local.utils.SegmentPushUtils; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.filesystem.PinotFS; @@ -192,6 +193,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName(); try { List<File> inputSegmentDirs = new ArrayList<>(); + int numRecords = 0; + for (int i = 0; i < downloadURLs.length; i++) { // Download the segment file _eventObserver.notifyProgress(_pinotTaskConfig, String @@ -209,6 +212,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe if (!FileUtils.deleteQuietly(tarredSegmentFile)) { LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath()); } + + reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + numRecords += segmentMetadata.getTotalDocs(); } // Convert the segments @@ -216,6 +223,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe Preconditions.checkState(workingDir.mkdir()); List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentDirs, workingDir); + reportTaskProcessingMetrics(tableNameWithType, taskType, numRecords); + // Create a directory for converted tarred segment files File convertedTarredSegmentDir = new File(tempDataDir, "convertedTarredSegmentDir"); Preconditions.checkState(convertedTarredSegmentDir.mkdir()); @@ -224,11 +233,13 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments); int count = 1; for (SegmentConversionResult segmentConversionResult : segmentConversionResults) { + File convertedSegmentDir = segmentConversionResult.getFile(); + reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType, taskType); + // Tar the converted segment _eventObserver.notifyProgress(_pinotTaskConfig, String .format("Compressing segment: %s (%d out of %d)", segmentConversionResult.getSegmentName(), count++, numOutputSegments)); - File convertedSegmentDir = segmentConversionResult.getFile(); File convertedSegmentTarFile = new File(convertedTarredSegmentDir, segmentConversionResult.getSegmentName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedSegmentTarFile); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index 22337ada6b..a920817ae9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -36,7 +36,6 @@ import org.apache.pinot.common.Utils; import org.apache.pinot.common.auth.AuthProviderUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.metrics.MinionMeter; -import org.apache.pinot.common.metrics.MinionMetrics; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; @@ -45,6 +44,8 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.event.MinionEventObserver; import org.apache.pinot.minion.event.MinionEventObservers; import org.apache.pinot.minion.exception.TaskCancelledException; +import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutor; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -60,8 +61,6 @@ import org.slf4j.LoggerFactory; public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingleSegmentConversionExecutor.class); - protected final MinionMetrics _minionMetrics = MinionMetrics.get(); - // Tracking finer grained progress status. protected PinotTaskConfig _pinotTaskConfig; protected MinionEventObserver _eventObserver; @@ -123,6 +122,9 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath()); } + // Publish metrics related to segment download + reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); + // Convert the segment File workingDir = new File(tempDataDir, "workingDir"); Preconditions.checkState(workingDir.mkdir()); @@ -135,6 +137,20 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut if (convertedSegmentDir == null) { return segmentConversionResult; } + + // Publish metrics related to segment upload + reportSegmentUploadMetrics(workingDir, tableNameWithType, taskType); + + // Collect the task processing metrics from various single segment executors and publish them here. + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + Object numRecordsPurged = segmentConversionResult.getCustomProperty(PurgeTaskExecutor.NUM_RECORDS_PURGED_KEY); + if (numRecordsPurged != null) { + reportTaskProcessingMetrics(tableNameWithType, taskType, segmentMetadata.getTotalDocs(), + (int) numRecordsPurged); + } else { + reportTaskProcessingMetrics(tableNameWithType, taskType, segmentMetadata.getTotalDocs()); + } + // Tar the converted segment _eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: " + segmentName); File convertedTarredSegmentFile = diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java index d85bf44737..2b57bbb8b4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java @@ -19,9 +19,13 @@ package org.apache.pinot.plugin.minion.tasks; import com.google.common.base.Preconditions; +import java.io.File; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.metrics.MinionMeter; +import org.apache.pinot.common.metrics.MinionMetrics; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.MinionContext; import org.apache.pinot.minion.executor.PinotTaskExecutor; @@ -33,6 +37,7 @@ public abstract class BaseTaskExecutor implements PinotTaskExecutor { protected static final MinionContext MINION_CONTEXT = MinionContext.getInstance(); protected boolean _cancelled = false; + protected final MinionMetrics _minionMetrics = MinionMetrics.get(); @Override public void cancel() { @@ -68,4 +73,32 @@ public abstract class BaseTaskExecutor implements PinotTaskExecutor { */ return segmentZKMetadata == null ? -1 : segmentZKMetadata.getCrc(); } + + protected void reportSegmentDownloadMetrics(File indexDir, String tableNameWithType, String taskType) { + long downloadSegmentSize = FileUtils.sizeOfDirectory(indexDir); + addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_DOWNLOADED, downloadSegmentSize, tableNameWithType, taskType); + addTaskMeterMetrics(MinionMeter.SEGMENT_DOWNLOAD_COUNT, 1L, tableNameWithType, taskType); + } + + protected void reportSegmentUploadMetrics(File indexDir, String tableNameWithType, String taskType) { + long uploadSegmentSize = FileUtils.sizeOfDirectory(indexDir); + addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_UPLOADED, uploadSegmentSize, tableNameWithType, taskType); + addTaskMeterMetrics(MinionMeter.SEGMENT_UPLOAD_COUNT, 1L, tableNameWithType, taskType); + } + + protected void reportTaskProcessingMetrics(String tableNameWithType, String taskType, int numRecordsProcessed, + int numRecordsPurged) { + reportTaskProcessingMetrics(tableNameWithType, taskType, numRecordsProcessed); + addTaskMeterMetrics(MinionMeter.RECORDS_PURGED_COUNT, numRecordsPurged, tableNameWithType, taskType); + } + + protected void reportTaskProcessingMetrics(String tableNameWithType, String taskType, int numRecordsProcessed) { + addTaskMeterMetrics(MinionMeter.RECORDS_PROCESSED_COUNT, numRecordsProcessed, tableNameWithType, taskType); + } + + private void addTaskMeterMetrics(MinionMeter meter, long unitCount, String tableName, String taskType) { + _minionMetrics.addMeteredGlobalValue(meter, unitCount); + _minionMetrics.addMeteredTableValue(tableName, meter, unitCount); + _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org