This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fc8ee6a09c Metrics for Segment Upload/Download (#15585)
fc8ee6a09c is described below

commit fc8ee6a09c4084262b2ba31968f677f3409746dd
Author: Praveen <praveenkchagan...@gmail.com>
AuthorDate: Wed Apr 23 13:42:22 2025 -0700

    Metrics for Segment Upload/Download (#15585)
    
    * Segment metrics
    
    * Upload guage
    
    * only controller metrics
    
    * cleanup
    
    * review comments 2
---
 .../pinot/common/metrics/ControllerGauge.java      | 11 ++-
 .../pinot/common/metrics/ControllerMeter.java      |  6 +-
 .../pinot/common/metrics/ControllerTimer.java      |  7 +-
 .../resources/LLCSegmentCompletionHandlers.java    | 10 +++
 .../PinotSegmentUploadDownloadRestletResource.java | 11 ++-
 .../controller/api/resources/ResourceUtils.java    | 90 +++++++++++++++++++++-
 .../pinot/controller/api/upload/ZKOperator.java    | 11 ++-
 7 files changed, 137 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3ae4db405a..51ff09387c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -207,7 +207,16 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   
PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount",
 false),
 
   // ZK JUTE max buffer size in bytes
-  ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true);
+  ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true),
+
+  // Bytes to be read from deep store
+  DEEP_STORE_READ_BYTES_IN_PROGRESS("deepStoreReadBytesInProgress", true),
+  // Count of deep store segment downloads that are currently in progress
+  DEEP_STORE_READ_OPS_IN_PROGRESS("deepStoreReadOpsInProgress", true),
+  // Bytes to be written to deep store
+  DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
+  // Count of deep store segment writes that are currently in progress
+  DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 850588cb45..5f934561ad 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -69,7 +69,11 @@ public enum ControllerMeter implements AbstractMetrics.Meter 
{
   IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
   IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
   IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false),
-  SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false);
+  SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false),
+  // Total Bytes read from deep store
+  DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true),
+  // Total Bytes written to deep store
+  DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true);
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index c56b46343c..23d4807627 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -30,7 +30,12 @@ public enum ControllerTimer implements AbstractMetrics.Timer 
{
   CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs", 
false),
   IDEAL_STATE_UPDATE_TIME_MS("IdealStateUpdateTimeMs", false),
   // How long it took the server to start.
-  STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true);
+  STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true),
+  // Time taken to read the segment from deep store
+  DEEP_STORE_SEGMENT_READ_TIME_MS("deepStoreSegmentReadTimeMs", true),
+  // Time taken to write the segment to deep store
+  DEEP_STORE_SEGMENT_WRITE_TIME_MS("deepStoreSegmentWriteTimeMs", true);
+
 
   private final String _timerName;
   private final boolean _global;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 6e3011e666..6df9c1c63a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.TarCompressionUtils;
@@ -71,6 +72,9 @@ public class LLCSegmentCompletionHandlers {
   @Inject
   SegmentCompletionManager _segmentCompletionManager;
 
+  @Inject
+  ControllerMetrics _controllerMetrics;
+
   @VisibleForTesting
   public static String getScheme() {
     return SCHEME;
@@ -227,7 +231,13 @@ public class LLCSegmentCompletionHandlers {
       URI segmentFileURI =
           
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
 rawTableName,
               
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
+      // Emit metrics related to deep-store upload operation
+      long startTimeMs = System.currentTimeMillis();
+      long segmentSizeBytes = localTempFile.length();
+      ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics, 
rawTableName, segmentSizeBytes);
       
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
 segmentFileURI);
+      ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics, 
rawTableName, startTimeMs, segmentSizeBytes);
+
       SegmentCompletionProtocol.Response.Params responseParams = new 
SegmentCompletionProtocol.Response.Params()
           
.withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset())
           .withSegmentLocation(segmentFileURI.toString())
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index deada81d92..f160c5c60a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -214,8 +214,15 @@ public class PinotSegmentUploadDownloadRestletResource {
       segmentFile =
           
org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(tableDir, 
segmentName + "-" + UUID.randomUUID(),
               "Invalid segment name: %s", segmentName);
-
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      // Emit metrics related to deep-store download operation
+      long deepStoreDownloadStartTimeMs = System.currentTimeMillis();
+      long segmentSizeInBytes = segmentFile.length();
+      ResourceUtils.emitPreSegmentDownloadMetrics(_controllerMetrics, 
rawTableName, segmentSizeInBytes);
       pinotFS.copyToLocalFile(remoteSegmentFileURI, segmentFile);
+      ResourceUtils.emitPostSegmentDownloadMetrics(_controllerMetrics, 
rawTableName,
+          System.currentTimeMillis() - deepStoreDownloadStartTimeMs, 
segmentSizeInBytes);
+
       // Streaming in the tmp file and delete it afterward.
       builder.entity((StreamingOutput) output -> {
         try {
@@ -233,6 +240,7 @@ public class PinotSegmentUploadDownloadRestletResource {
   private SuccessResponse uploadSegment(@Nullable String tableName, TableType 
tableType,
       @Nullable FormDataMultiPart multiPart, boolean 
copySegmentToFinalLocation, boolean enableParallelPushProtection,
       boolean allowRefresh, HttpHeaders headers, Request request) {
+    long segmentUploadStartTimeMs = System.currentTimeMillis();
     if (StringUtils.isNotEmpty(tableName)) {
       TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
       if (tableTypeFromTableName != null && tableTypeFromTableName != 
tableType) {
@@ -412,7 +420,6 @@ public class PinotSegmentUploadDownloadRestletResource {
       zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, 
uploadType, finalSegmentLocationURI,
           segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, 
crypterName, segmentSizeInBytes,
           enableParallelPushProtection, allowRefresh, headers);
-
       return new SuccessResponse("Successfully uploaded segment: " + 
segmentName + " of table: " + tableNameWithType);
     } catch (WebApplicationException e) {
       throw e;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
index ab091c8138..3d6fce7448 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
@@ -19,10 +19,16 @@
 package org.apache.pinot.controller.api.resources;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.access.AccessControlUtils;
@@ -34,13 +40,20 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.glassfish.grizzly.http.server.Request;
 import org.slf4j.Logger;
 
-
 public class ResourceUtils {
+
   private ResourceUtils() {
   }
 
+  // Shared static variable
+  private static AtomicLong _deepStoreWriteOpsInProgress = new AtomicLong(0);
+  private static AtomicLong _deepStoreWriteBytesInProgress = new AtomicLong(0);
+  private static AtomicLong _deepStoreReadOpsInProgress = new AtomicLong(0);
+  private static AtomicLong _deepStoreReadBytesInProgress = new AtomicLong(0);
+
   public static List<String> 
getExistingTableNamesWithType(PinotHelixResourceManager 
pinotHelixResourceManager,
-      String tableName, @Nullable TableType tableType, Logger logger) {
+                                                           String tableName, 
@Nullable TableType tableType,
+                                                           Logger logger) {
     try {
       return 
pinotHelixResourceManager.getExistingTableNamesWithType(tableName, tableType);
     } catch (TableNotFoundException e) {
@@ -78,4 +91,77 @@ public class ResourceUtils {
       throw new ControllerApplicationException(logger, "Permission denied", 
Response.Status.FORBIDDEN);
     }
   }
+
+  public static void emitPreSegmentUploadMetrics(ControllerMetrics 
controllerMetrics, String rawTableName,
+                                                 long segmentSizeInBytes) {
+    long writeCount = _deepStoreWriteOpsInProgress.incrementAndGet();
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+            writeCount);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
 writeCount);
+
+    long segmentBytesUploading = 
_deepStoreWriteBytesInProgress.addAndGet(segmentSizeInBytes);
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS,
+            segmentBytesUploading);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS,
 segmentBytesUploading);
+  }
+
+  public static void emitPostSegmentUploadMetrics(ControllerMetrics 
controllerMetrics, String rawTableName,
+                                                  long startTimeMs, long 
segmentSizeInBytes) {
+    long writeCount = _deepStoreWriteOpsInProgress.decrementAndGet();
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+            writeCount);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
 writeCount);
+
+    long segmentBytesUploading = 
_deepStoreWriteBytesInProgress.addAndGet(-segmentSizeInBytes);
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+            segmentBytesUploading);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
 segmentBytesUploading);
+
+    long durationMs = System.currentTimeMillis() - startTimeMs;
+    controllerMetrics.addTimedTableValue(rawTableName, 
ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS, durationMs,
+            TimeUnit.MILLISECONDS);
+    
controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS,
 durationMs,
+            TimeUnit.MILLISECONDS);
+
+    controllerMetrics.addMeteredTableValue(rawTableName, 
ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED,
+            segmentSizeInBytes);
+    
controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED,
 segmentSizeInBytes);
+  }
+
+  public static void emitPreSegmentDownloadMetrics(ControllerMetrics 
controllerMetrics, String rawTableName,
+                                                   long segmentSizeInBytes) {
+    long readCount = _deepStoreReadOpsInProgress.incrementAndGet();
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
+            readCount);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
 readCount);
+
+    long segmentBytesDownloading = 
_deepStoreReadBytesInProgress.addAndGet(segmentSizeInBytes);
+    controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+            segmentBytesDownloading);
+    
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+            segmentBytesDownloading);
+  }
+
+    public static void emitPostSegmentDownloadMetrics(ControllerMetrics 
controllerMetrics, String rawTableName,
+                                                        long startTimeMs, long 
segmentSizeInBytes) {
+      long readCount = _deepStoreReadOpsInProgress.decrementAndGet();
+      controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, readCount);
+      
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
 readCount);
+
+      long segmentBytesDownloading = 
_deepStoreReadBytesInProgress.addAndGet(-segmentSizeInBytes);
+      controllerMetrics.setOrUpdateTableGauge(rawTableName, 
ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+              segmentBytesDownloading);
+      
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+              segmentBytesDownloading);
+
+      long durationMs = System.currentTimeMillis() - startTimeMs;
+      controllerMetrics.addTimedTableValue(rawTableName, 
ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS,
+              durationMs, TimeUnit.MILLISECONDS);
+      
controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS,
 durationMs,
+              TimeUnit.MILLISECONDS);
+
+      controllerMetrics.addMeteredTableValue(rawTableName, 
ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED,
+              segmentSizeInBytes);
+      
controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED,
 segmentSizeInBytes);
+    }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 8486853650..8ff39b9def 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -40,10 +40,12 @@ import 
org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.resources.ResourceUtils;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -771,16 +773,21 @@ public class ZKOperator {
     } else {
       // In push types other than METADATA, local segmentFile contains the 
complete segment.
       // Move local segment to final location
-      copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
+      copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI, 
tableNameWithType);
       LOGGER.info("Copied segment: {} of table: {} to final location: {}", 
segmentName, tableNameWithType,
           finalSegmentLocationURI);
     }
   }
 
-  private void copyFromSegmentFileToDeepStore(File segmentFile, URI 
finalSegmentLocationURI)
+  private void copyFromSegmentFileToDeepStore(File segmentFile, URI 
finalSegmentLocationURI, String tableNameWithType)
       throws Exception {
     LOGGER.info("Copying segment from: {} to: {}", 
segmentFile.getAbsolutePath(), finalSegmentLocationURI);
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    long segmentSizeInBytes = segmentFile.length();
+    long startTimeMs = System.currentTimeMillis();
+    ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics, 
rawTableName, segmentSizeInBytes);
     
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile,
 finalSegmentLocationURI);
+    ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics, 
rawTableName, startTimeMs, segmentSizeInBytes);
   }
 
   private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI 
finalSegmentLocationURI)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to