This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b9965c8a1c Realtime segment size threshold metrics (#14485)
b9965c8a1c is described below

commit b9965c8a1c84e277e13ad5bfd7d5ead40550c5c0
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Tue Nov 19 10:23:22 2024 -0800

    Realtime segment size threshold metrics (#14485)
    
    * Realtime segment size threshold metrics
    
    * Addessed PR comment
---
 .../org/apache/pinot/common/metrics/ControllerGauge.java  | 10 ++++++++++
 .../realtime/segment/FlushThresholdUpdateManager.java     |  2 +-
 .../segment/SegmentSizeBasedFlushThresholdUpdater.java    | 13 ++++++++++++-
 .../core/realtime/segment/FlushThresholdUpdaterTest.java  | 15 ++++++++++-----
 4 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index d052a75485..cdb99f0f90 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -165,6 +165,16 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
 
   TABLE_DISABLED("tableDisabled", false),
 
+  // A per-table metric that shows the number of rows we expect to consume for 
the next segment of
+  // any partition in the realtime table. This metric is emitted from the 
segment size based threshold
+  // computer.
+  NUM_ROWS_THRESHOLD("numRowsThreshold", false),
+
+  // The actual segment size for committing segments. These may be shorter 
than expected when the administrator
+  // issues a force-commit, or zero when new partitions are detected in the 
stream (since there is no completing
+  // segment when the partition is first detected).
+  COMMITTING_SEGMENT_SIZE("committingSegmentSize", false),
+
   TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);
 
   private final String _gaugeName;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index 0076434b66..e72971c5e0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -61,7 +61,7 @@ public class FlushThresholdUpdateManager {
     long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
     if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
       return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
-          k -> new SegmentSizeBasedFlushThresholdUpdater());
+          k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName));
     } else {
       _flushThresholdUpdaterMap.remove(realtimeTableName);
       return new 
DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 511e85651b..546dfb1097 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -20,7 +20,10 @@ package 
org.apache.pinot.controller.helix.core.realtime.segment;
 
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,9 +39,13 @@ import org.slf4j.LoggerFactory;
 public class SegmentSizeBasedFlushThresholdUpdater implements 
FlushThresholdUpdater {
   public static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
   private final SegmentFlushThresholdComputer _flushThresholdComputer;
+  private final String _rawTableName;
 
-  public SegmentSizeBasedFlushThresholdUpdater() {
+  private final ControllerMetrics _controllerMetrics = ControllerMetrics.get();
+
+  public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) {
     _flushThresholdComputer = new SegmentFlushThresholdComputer();
+    _rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
   }
 
   // synchronized since this method could be called for multiple partitions of 
the same table in different threads
@@ -50,5 +57,9 @@ public class SegmentSizeBasedFlushThresholdUpdater implements 
FlushThresholdUpda
         _flushThresholdComputer.computeThreshold(streamConfig, 
committingSegmentDescriptor, committingSegmentZKMetadata,
             newSegmentZKMetadata.getSegmentName());
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
+
+    _controllerMetrics.setOrUpdateTableGauge(_rawTableName, 
ControllerGauge.NUM_ROWS_THRESHOLD, threshold);
+    _controllerMetrics.setOrUpdateTableGauge(_rawTableName, 
ControllerGauge.COMMITTING_SEGMENT_SIZE,
+        committingSegmentDescriptor.getSegmentSizeBytes());
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 9ae04827b6..712f80078b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -139,7 +139,8 @@ public class FlushThresholdUpdaterTest {
 
     for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
         STEPS_SEGMENT_SIZES_MB)) {
-      SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
+      SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -176,7 +177,8 @@ public class FlushThresholdUpdaterTest {
 
     for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
         STEPS_SEGMENT_SIZES_MB)) {
-      SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
+      SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
@@ -236,7 +238,8 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testTimeThreshold() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
     StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
 
     // Start consumption
@@ -269,7 +272,8 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testMinThreshold() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
     StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
 
     // Start consumption
@@ -301,7 +305,8 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testSegmentSizeBasedUpdaterWithModifications() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater();
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
+        = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
     // Use customized stream config
     long flushSegmentDesiredSizeBytes = 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to