This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 28aec2e01c Add some additional metrics for Minion tasks (#12710)
28aec2e01c is described below

commit 28aec2e01cedfcfaf47e83f41f58f810ac153b31
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Sun Mar 31 18:54:27 2024 -0700

    Add some additional metrics for Minion tasks (#12710)
    
    * Add some additional metrics for Minion tasks
    
    * Address review comments
---
 .../apache/pinot/common/metrics/MinionMeter.java   |  8 +++++-
 .../apache/pinot/core/minion/SegmentPurger.java    |  8 ++++--
 .../BaseMultipleSegmentsConversionExecutor.java    | 13 ++++++++-
 .../tasks/BaseSingleSegmentConversionExecutor.java | 22 +++++++++++++--
 .../plugin/minion/tasks/BaseTaskExecutor.java      | 33 ++++++++++++++++++++++
 5 files changed, 76 insertions(+), 8 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
index 376f86e55e..c85aad39ed 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
@@ -31,7 +31,13 @@ public enum MinionMeter implements AbstractMetrics.Meter {
   NUMBER_TASKS_FAILED("tasks", false),
   NUMBER_TASKS_FATAL_FAILED("tasks", false),
   SEGMENT_UPLOAD_FAIL_COUNT("segments", false),
-  SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false);
+  SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false),
+  SEGMENT_DOWNLOAD_COUNT("segments", false),
+  SEGMENT_UPLOAD_COUNT("segments", false),
+  SEGMENT_BYTES_DOWNLOADED("bytes", false),
+  SEGMENT_BYTES_UPLOADED("bytes", false),
+  RECORDS_PROCESSED_COUNT("rows", false),
+  RECORDS_PURGED_COUNT("rows", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 4faf695522..2ab65bbe9c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -70,7 +70,8 @@ public class SegmentPurger {
       throws Exception {
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     String segmentName = segmentMetadata.getName();
-    LOGGER.info("Start purging table: {}, segment: {}", 
_tableConfig.getTableName(), segmentName);
+    String tableNameWithType = _tableConfig.getTableName();
+    LOGGER.info("Start purging table: {}, segment: {}", tableNameWithType, 
segmentName);
 
     try (PurgeRecordReader purgeRecordReader = new PurgeRecordReader()) {
       // Make a first pass through the data to see if records need to be 
purged or modified
@@ -107,8 +108,9 @@ public class SegmentPurger {
       driver.build();
     }
 
-    LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, 
modified {} records",
-        _tableConfig.getTableName(), segmentName, _numRecordsPurged, 
_numRecordsModified);
+    LOGGER.info("Finish purging table: {}, segment: {}, purged {} records, 
modified {} records", tableNameWithType,
+        segmentName, _numRecordsPurged, _numRecordsModified);
+
     return new File(_workingDir, segmentName);
   }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 6b439add13..e7ef8a4eea 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -51,6 +51,7 @@ import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -192,6 +193,8 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String crypterName = 
getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
     try {
       List<File> inputSegmentDirs = new ArrayList<>();
+      int numRecords = 0;
+
       for (int i = 0; i < downloadURLs.length; i++) {
         // Download the segment file
         _eventObserver.notifyProgress(_pinotTaskConfig, String
@@ -209,6 +212,10 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
           LOGGER.warn("Failed to delete tarred input segment: {}", 
tarredSegmentFile.getAbsolutePath());
         }
+
+        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+        SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
+        numRecords += segmentMetadata.getTotalDocs();
       }
 
       // Convert the segments
@@ -216,6 +223,8 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       Preconditions.checkState(workingDir.mkdir());
       List<SegmentConversionResult> segmentConversionResults = 
convert(pinotTaskConfig, inputSegmentDirs, workingDir);
 
+      reportTaskProcessingMetrics(tableNameWithType, taskType, numRecords);
+
       // Create a directory for converted tarred segment files
       File convertedTarredSegmentDir = new File(tempDataDir, 
"convertedTarredSegmentDir");
       Preconditions.checkState(convertedTarredSegmentDir.mkdir());
@@ -224,11 +233,13 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
       int count = 1;
       for (SegmentConversionResult segmentConversionResult : 
segmentConversionResults) {
+        File convertedSegmentDir = segmentConversionResult.getFile();
+        reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType, 
taskType);
+
         // Tar the converted segment
         _eventObserver.notifyProgress(_pinotTaskConfig, String
             .format("Compressing segment: %s (%d out of %d)", 
segmentConversionResult.getSegmentName(), count++,
                 numOutputSegments));
-        File convertedSegmentDir = segmentConversionResult.getFile();
         File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
             segmentConversionResult.getSegmentName() + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
         TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, 
convertedSegmentTarFile);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 22337ada6b..a920817ae9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -36,7 +36,6 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.metrics.MinionMeter;
-import org.apache.pinot.common.metrics.MinionMetrics;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -45,6 +44,8 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutor;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -60,8 +61,6 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseSingleSegmentConversionExecutor extends 
BaseTaskExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseSingleSegmentConversionExecutor.class);
 
-  protected final MinionMetrics _minionMetrics = MinionMetrics.get();
-
   // Tracking finer grained progress status.
   protected PinotTaskConfig _pinotTaskConfig;
   protected MinionEventObserver _eventObserver;
@@ -123,6 +122,9 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
         LOGGER.warn("Failed to delete tarred input segment: {}", 
tarredSegmentFile.getAbsolutePath());
       }
 
+      // Publish metrics related to segment download
+      reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+
       // Convert the segment
       File workingDir = new File(tempDataDir, "workingDir");
       Preconditions.checkState(workingDir.mkdir());
@@ -135,6 +137,20 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
       if (convertedSegmentDir == null) {
         return segmentConversionResult;
       }
+
+      // Publish metrics related to segment upload
+      reportSegmentUploadMetrics(workingDir, tableNameWithType, taskType);
+
+      // Collect the task processing metrics from various single segment 
executors and publish them here.
+      SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+      Object numRecordsPurged = 
segmentConversionResult.getCustomProperty(PurgeTaskExecutor.NUM_RECORDS_PURGED_KEY);
+      if (numRecordsPurged != null) {
+        reportTaskProcessingMetrics(tableNameWithType, taskType, 
segmentMetadata.getTotalDocs(),
+            (int) numRecordsPurged);
+       } else {
+        reportTaskProcessingMetrics(tableNameWithType, taskType, 
segmentMetadata.getTotalDocs());
+      }
+
       // Tar the converted segment
       _eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: " 
+ segmentName);
       File convertedTarredSegmentFile =
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index d85bf44737..2b57bbb8b4 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -19,9 +19,13 @@
 package org.apache.pinot.plugin.minion.tasks;
 
 import com.google.common.base.Preconditions;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -33,6 +37,7 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
   protected static final MinionContext MINION_CONTEXT = 
MinionContext.getInstance();
 
   protected boolean _cancelled = false;
+  protected final MinionMetrics _minionMetrics = MinionMetrics.get();
 
   @Override
   public void cancel() {
@@ -68,4 +73,32 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
      */
     return segmentZKMetadata == null ? -1 : segmentZKMetadata.getCrc();
   }
+
+  protected void reportSegmentDownloadMetrics(File indexDir, String 
tableNameWithType, String taskType) {
+    long downloadSegmentSize = FileUtils.sizeOfDirectory(indexDir);
+    addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_DOWNLOADED, 
downloadSegmentSize, tableNameWithType, taskType);
+    addTaskMeterMetrics(MinionMeter.SEGMENT_DOWNLOAD_COUNT, 1L, 
tableNameWithType, taskType);
+  }
+
+  protected void reportSegmentUploadMetrics(File indexDir, String 
tableNameWithType, String taskType) {
+    long uploadSegmentSize = FileUtils.sizeOfDirectory(indexDir);
+    addTaskMeterMetrics(MinionMeter.SEGMENT_BYTES_UPLOADED, uploadSegmentSize, 
tableNameWithType, taskType);
+    addTaskMeterMetrics(MinionMeter.SEGMENT_UPLOAD_COUNT, 1L, 
tableNameWithType, taskType);
+  }
+
+  protected void reportTaskProcessingMetrics(String tableNameWithType, String 
taskType, int numRecordsProcessed,
+      int numRecordsPurged) {
+    reportTaskProcessingMetrics(tableNameWithType, taskType, 
numRecordsProcessed);
+    addTaskMeterMetrics(MinionMeter.RECORDS_PURGED_COUNT, numRecordsPurged, 
tableNameWithType, taskType);
+  }
+
+  protected void reportTaskProcessingMetrics(String tableNameWithType, String 
taskType, int numRecordsProcessed) {
+    addTaskMeterMetrics(MinionMeter.RECORDS_PROCESSED_COUNT, 
numRecordsProcessed, tableNameWithType, taskType);
+  }
+
+  private void addTaskMeterMetrics(MinionMeter meter, long unitCount, String 
tableName, String taskType) {
+    _minionMetrics.addMeteredGlobalValue(meter, unitCount);
+    _minionMetrics.addMeteredTableValue(tableName, meter, unitCount);
+    _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to