This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bcdb053af3 Calculate size based flush threshold per topic (#14765) bcdb053af3 is described below commit bcdb053af3319b191348caa0fb095561344eb0e6 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Fri Feb 7 13:56:11 2025 -0800 Calculate size based flush threshold per topic (#14765) * calculate size based flush threshold per topic * emit duplicate metrics for backwards compatibility --- .../pinot/common/metrics/ControllerGauge.java | 4 ++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 3 +-- .../segment/FlushThresholdUpdateManager.java | 28 +++++++++++++++------- .../SegmentSizeBasedFlushThresholdUpdater.java | 11 ++++++++- .../segment/FlushThresholdUpdaterTest.java | 20 +++++++++------- 5 files changed, 45 insertions(+), 21 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 99bd892066..475f128208 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -174,11 +174,15 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // any partition in the realtime table. This metric is emitted from the segment size based threshold // computer. NUM_ROWS_THRESHOLD("numRowsThreshold", false), + // Added to preserve backwards compatibility of the above metric + NUM_ROWS_THRESHOLD_WITH_TOPIC("numRowsThresholdWithTopic", false), // The actual segment size for committing segments. These may be shorter than expected when the administrator // issues a force-commit, or zero when new partitions are detected in the stream (since there is no completing // segment when the partition is first detected). COMMITTING_SEGMENT_SIZE("committingSegmentSize", false), + // Added to preserve backwards compatibility of the above metric + COMMITTING_SEGMENT_SIZE_WITH_TOPIC("committingSegmentSizeWithTopic", false), TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index ce11aae1c0..e296136975 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -326,11 +326,10 @@ public class PinotLLCRealtimeSegmentManager { String realtimeTableName = tableConfig.getTableName(); LOGGER.info("Setting up new LLC table: {}", realtimeTableName); - _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName); - List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map( streamConfig -> new StreamConfig(tableConfig.getTableName(), streamConfig) ).collect(Collectors.toList()); + streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java index e72971c5e0..b4476e3bac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java @@ -18,13 +18,14 @@ */ package org.apache.pinot.controller.helix.core.realtime.segment; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.pinot.spi.stream.StreamConfig; /** - * Manager which maintains the flush threshold update objects for each table + * Manager which maintains the flush threshold update objects for each (table, topic) pair */ public class FlushThresholdUpdateManager { private final ConcurrentMap<String, FlushThresholdUpdater> _flushThresholdUpdaterMap = new ConcurrentHashMap<>(); @@ -45,30 +46,39 @@ public class FlushThresholdUpdateManager { * partitions consumed by a server; FixedFlushThresholdUpdater sets the actual segment flush threshold as is. */ public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig streamConfig) { + String tableTopicKey = getKey(streamConfig); String realtimeTableName = streamConfig.getTableNameWithType(); - int flushThresholdRows = streamConfig.getFlushThresholdRows(); if (flushThresholdRows > 0) { - _flushThresholdUpdaterMap.remove(realtimeTableName); + _flushThresholdUpdaterMap.remove(tableTopicKey); return new DefaultFlushThresholdUpdater(flushThresholdRows); } int flushThresholdSegmentRows = streamConfig.getFlushThresholdSegmentRows(); if (flushThresholdSegmentRows > 0) { - _flushThresholdUpdaterMap.remove(realtimeTableName); + _flushThresholdUpdaterMap.remove(tableTopicKey); return new FixedFlushThresholdUpdater(flushThresholdSegmentRows); } // Legacy behavior: when flush threshold rows is explicitly set to 0, use segment size based flush threshold long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) { - return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName, - k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName)); + return _flushThresholdUpdaterMap.computeIfAbsent(tableTopicKey, + k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName, streamConfig.getTopicName())); } else { - _flushThresholdUpdaterMap.remove(realtimeTableName); + _flushThresholdUpdaterMap.remove(tableTopicKey); return new DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); } } - public void clearFlushThresholdUpdater(String realtimeTableName) { - _flushThresholdUpdaterMap.remove(realtimeTableName); + public void clearFlushThresholdUpdater(StreamConfig streamConfig) { + _flushThresholdUpdaterMap.remove(getKey(streamConfig)); + } + + private String getKey(StreamConfig streamConfig) { + return streamConfig.getTableNameWithType() + "," + streamConfig.getTopicName(); + } + + @VisibleForTesting + public int getFlushThresholdUpdaterMapSize() { + return _flushThresholdUpdaterMap.size(); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java index 717a95bde3..1d9d3bd90d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java @@ -39,12 +39,14 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class); private final SegmentFlushThresholdComputer _flushThresholdComputer; private final String _realtimeTableName; + private final String _topicName; private final ControllerMetrics _controllerMetrics = ControllerMetrics.get(); - public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) { + public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName, String topicName) { _flushThresholdComputer = new SegmentFlushThresholdComputer(); _realtimeTableName = realtimeTableName; + _topicName = topicName; } // synchronized since this method could be called for multiple partitions of the same table in different threads @@ -57,8 +59,15 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda newSegmentZKMetadata.getSegmentName()); newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold); + // metrics tagged with table only _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, ControllerGauge.NUM_ROWS_THRESHOLD, threshold); _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, ControllerGauge.COMMITTING_SEGMENT_SIZE, committingSegmentDescriptor.getSegmentSizeBytes()); + + // metrics tagged with topic and table + _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName, + ControllerGauge.NUM_ROWS_THRESHOLD_WITH_TOPIC, threshold); + _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName, + ControllerGauge.COMMITTING_SEGMENT_SIZE_WITH_TOPIC, committingSegmentDescriptor.getSegmentSizeBytes()); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java index 712f80078b..2d45753015 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java @@ -90,7 +90,9 @@ public class FlushThresholdUpdaterTest { segmentBasedflushThresholdUpdater = flushThresholdUpdater; // Clear the updater - flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME); + assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 1); + flushThresholdUpdateManager.clearFlushThresholdUpdater(mockStreamConfig(0, -1, -1)); + assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 0); // Call again with flush threshold rows set to 0 - a different Object should be returned flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, -1)); @@ -140,7 +142,7 @@ public class FlushThresholdUpdaterTest { for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) { SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = - new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName()); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0); @@ -178,7 +180,7 @@ public class FlushThresholdUpdaterTest { for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) { SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = - new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName()); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1); @@ -238,9 +240,9 @@ public class FlushThresholdUpdaterTest { @Test public void testTimeThreshold() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = - new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); StreamConfig streamConfig = mockDefaultAutotuneStreamConfig(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName()); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0); @@ -272,9 +274,9 @@ public class FlushThresholdUpdaterTest { @Test public void testMinThreshold() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = - new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); StreamConfig streamConfig = mockDefaultAutotuneStreamConfig(); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = + new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName()); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0); @@ -305,8 +307,6 @@ public class FlushThresholdUpdaterTest { @Test public void testSegmentSizeBasedUpdaterWithModifications() { - SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater - = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME); // Use customized stream config long flushSegmentDesiredSizeBytes = StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2; @@ -314,6 +314,8 @@ public class FlushThresholdUpdaterTest { int flushAutotuneInitialRows = StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2; StreamConfig streamConfig = mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows); + SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater + = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, streamConfig.getTopicName()); // Start consumption SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org