This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dd0c3a06d Adding metrics for pauseless observability (#15384)
5dd0c3a06d is described below

commit 5dd0c3a06d8fa67d6512a1f86ea8778b83d8e734
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Fri Mar 28 14:59:32 2025 +0530

    Adding metrics for pauseless observability (#15384)
    
    * Adding metrics to signify:
    1. Pauseless is enabled/ disabled on server and controller
    2. Build failures during the segment commit protocol
    
    * Adding metrics for re-ingestion failure
---
 .../org/apache/pinot/common/metrics/ControllerGauge.java  |  3 +++
 .../java/org/apache/pinot/common/metrics/ServerGauge.java |  2 ++
 .../java/org/apache/pinot/common/metrics/ServerMeter.java |  6 +++++-
 .../helix/core/realtime/SegmentCompletionManager.java     | 15 +++++++++++++--
 .../data/manager/realtime/RealtimeSegmentDataManager.java | 11 +++++++++++
 .../pinot/server/api/resources/ReingestionResource.java   |  3 +++
 6 files changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 6418aca8d1..62452adf40 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -194,6 +194,9 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Metric used to track errors during the periodic table retention management
   RETENTION_MANAGER_ERROR("retentionManagerError", false),
 
+  // Gauge to reflect whether pauseless is enabled or not
+  PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false),
+
   // Metric used to track when segments in error state are detected for 
pauseless table
   PAUSELESS_SEGMENTS_IN_ERROR_COUNT("pauselessSegmentsInErrorCount", false),
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 548ec15a07..bfe0ee1f3c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -41,6 +41,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false),
   REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
   LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
+  // Gauge to reflect whether pauseless is enabled or not
+  PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false),
   // Upsert metrics
   UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
   // Dedup metrics
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index d2aa0b3e55..35996ebb0b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -98,6 +98,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   SEGMENT_DOWNLOAD_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false),
+  SEGMENT_BUILD_FAILURE("segments", false),
   SEGMENT_UPLOAD_FAILURE("segments", false),
   SEGMENT_UPLOAD_SUCCESS("segments", false),
   // Emitted only by Server to Deep-store segment uploader.
@@ -184,7 +185,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   PREDOWNLOAD_SEGMENT_DOWNLOAD_COUNT("predownloadSegmentCount", true),
   PREDOWNLOAD_SEGMENT_DOWNLOAD_FAILURE_COUNT("predownloadSegmentFailureCount", 
true),
   PREDOWNLOAD_SUCCEED("predownloadSucceed", true),
-  PREDOWNLOAD_FAILED("predownloadFailed", true);
+  PREDOWNLOAD_FAILED("predownloadFailed", true),
+
+  // reingestion metrics
+  SEGMENT_REINGESTION_FAILURE("segments", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7f09efd2ee..f326399023 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -138,8 +139,18 @@ public class SegmentCompletionManager {
     }
 
     if (factoryName == null) {
-      factoryName = PauselessConsumptionUtils.isPauselessEnabled(tableConfig)
-          ? _segmentCompletionConfig.getDefaultPauselessFsmScheme() : 
_segmentCompletionConfig.getDefaultFsmScheme();
+      // Create a metric identifier at partition level granularity, similar to 
server metrics
+      // in RealtimeSegmentValidationManager
+      String tableNameAndPartitionGroupId = realtimeTableName + "-" + 
llcSegmentName.getPartitionGroupId();
+      if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+        factoryName = _segmentCompletionConfig.getDefaultPauselessFsmScheme();
+        _controllerMetrics.setValueOfTableGauge(tableNameAndPartitionGroupId,
+            ControllerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1);
+      } else {
+        factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
+        _controllerMetrics.setValueOfTableGauge(tableNameAndPartitionGroupId,
+            ControllerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0);
+      }
     }
 
     
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 730055a89c..a5ac3c2e95 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -859,6 +859,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
                 // respectively.
                 // Refer to the PR for the new commit protocol: 
https://github.com/apache/pinot/pull/14741
                 if 
(PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
+                  _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 1);
                   if (!startSegmentCommit()) {
                     // If for any reason commit failed, we don't want to be in 
COMMITTING state when we hold.
                     // Change the state to HOLDING before looping around.
@@ -867,6 +868,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
                     hold();
                     break;
                   }
+                } else {
+                  _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.PAUSELESS_CONSUMPTION_ENABLED, 0);
                 }
                 long buildTimeSeconds = response.getBuildTimeSeconds();
                 buildSegmentForCommit(buildTimeSeconds * 1000L);
@@ -1102,6 +1105,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       String errorMessage = "Interrupted while waiting for semaphore";
       _segmentLogger.error(errorMessage, e);
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
+      _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
       return null;
     }
     try {
@@ -1134,6 +1138,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           // Precondition checks fail, the segment build would fail 
consistently
           _segmentBuildFailedWithDeterministicError = true;
         }
+        _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
         return null;
       }
       final long buildTimeMillis = now() - lockAcquireTimeMillis;
@@ -1155,6 +1160,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             + indexDir;
         _segmentLogger.error(errorMessage, e);
         _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
+        _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
         return null;
       } finally {
         FileUtils.deleteQuietly(tempSegmentFolder);
@@ -1175,6 +1181,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               + segmentTarFile;
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
+          _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
           return null;
         }
 
@@ -1184,6 +1191,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               + " under index directory: " + indexDir;
           _segmentLogger.error(errorMessage);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, null));
+          _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
           return null;
         }
         File creationMetaFile = 
SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -1192,6 +1200,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               + indexDir;
           _segmentLogger.error(errorMessage);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, null));
+          _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE, 1);
           return null;
         }
         Map<String, File> metadataFiles = new HashMap<>();
@@ -1322,6 +1331,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void cleanupMetrics() {
     _serverMetrics.removeTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING);
     _serverMetrics.removeTableGauge(_clientId, ServerGauge.STREAM_DATA_LOSS);
+    _serverMetrics.removeTableGauge(_clientId, 
ServerGauge.PAUSELESS_CONSUMPTION_ENABLED);
+    _serverMetrics.removeTableMeter(_clientId, 
ServerMeter.SEGMENT_BUILD_FAILURE);
   }
 
   protected void hold()
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java
index c137b52e4c..9b1c93e4de 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReingestionResource.java
@@ -52,6 +52,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.segment.local.realtime.writer.StatelessRealtimeSegmentWriter;
@@ -210,6 +211,8 @@ public class ReingestionResource {
             tableDataManager.getSegmentBuildSemaphore());
       } catch (Exception e) {
         LOGGER.error("Error during async re-ingestion for job {} 
(segment={})", jobId, segmentName, e);
+        _serverInstance.getServerMetrics()
+            .addMeteredTableValue(realtimeTableName, 
ServerMeter.SEGMENT_REINGESTION_FAILURE, 1);
       } finally {
         _runningJobs.remove(jobId);
         _reingestingSegments.remove(segmentName);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to