This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b9965c8a1c Realtime segment size threshold metrics (#14485) b9965c8a1c is described below commit b9965c8a1c84e277e13ad5bfd7d5ead40550c5c0 Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Tue Nov 19 10:23:22 2024 -0800 Realtime segment size threshold metrics (#14485) * Realtime segment size threshold metrics * Addessed PR comment --- .../org/apache/pinot/common/metrics/ControllerGauge.java | 10 ++++++++++ .../realtime/segment/FlushThresholdUpdateManager.java | 2 +- .../segment/SegmentSizeBasedFlushThresholdUpdater.java | 13 ++++++++++++- .../core/realtime/segment/FlushThresholdUpdaterTest.java | 15 ++++++++++----- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index d052a75485..cdb99f0f90 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -165,6 +165,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_DISABLED("tableDisabled", false), + // A per-table metric that shows the number of rows we expect to consume for the next segment of + // any partition in the realtime table. This metric is emitted from the segment size based threshold + // computer. + NUM_ROWS_THRESHOLD("numRowsThreshold", false), + + // The actual segment size for committing segments. These may be shorter than expected when the administrator + // issues a force-commit, or zero when new partitions are detected in the stream (since there is no completing + // segment when the partition is first detected). + COMMITTING_SEGMENT_SIZE("committingSegmentSize", false), + TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false); private final String _gaugeName; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java index 0076434b66..e72971c5e0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java @@ -61,7 +61,7 @@ public class FlushThresholdUpdateManager { long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) { return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName, - k -> new SegmentSizeBasedFlushThresholdUpdater()); + k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName)); } else { _flushThresholdUpdaterMap.remove(realtimeTableName); return new DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java index 511e85651b..546dfb1097 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java @@ -20,7 +20,10 @@ package org.apache.pinot.controller.helix.core.realtime.segment; import javax.annotation.Nullable; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +39,13 @@ import org.slf4j.LoggerFactory; public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpdater { public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class); private final SegmentFlushThresholdComputer _flushThresholdComputer; + private final String _rawTableName; - public SegmentSizeBasedFlushThresholdUpdater() { + private final ControllerMetrics _controllerMetrics = ControllerMetrics.get(); + + public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) { _flushThresholdComputer = new SegmentFlushThresholdComputer(); + _rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); } // synchronized since this method could be called for multiple partitions of the same table in different threads @@ -50,5 +57,9 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda _flushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata, newSegmentZKMetadata.getSegmentName()); newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold); + + _controllerMetrics.setOrUpdateTableGauge(_rawTableName, ControllerGauge.NUM_ROWS_THRESHOLD, threshold); + _controllerMetrics.setOrUpdateTableGauge(_rawTableName, ControllerGauge.COMMITTING_SEGMENT_SIZE, + committingSegmentDescriptor.getSegmentSizeBytes()); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java index 9ae04827b6..712f80078b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java @@ -139,7 +139,8 @@ public class FlushThresholdUpdaterTest { for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0); @@ -176,7 +177,8 @@ public class FlushThresholdUpdaterTest { for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1); @@ -236,7 +238,8 @@ public class FlushThresholdUpdaterTest { @Test public void testTimeThreshold() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); StreamConfig streamConfig = mockDefaultAutotuneStreamConfig(); // Start consumption @@ -269,7 +272,8 @@ public class FlushThresholdUpdaterTest { @Test public void testMinThreshold() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); StreamConfig streamConfig = mockDefaultAutotuneStreamConfig(); // Start consumption @@ -301,7 +305,8 @@ public class FlushThresholdUpdaterTest { @Test public void testSegmentSizeBasedUpdaterWithModifications() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater + = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); // Use customized stream config long flushSegmentDesiredSizeBytes = StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org