This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bcdb053af3 Calculate size based flush threshold per topic (#14765)
bcdb053af3 is described below

commit bcdb053af3319b191348caa0fb095561344eb0e6
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Fri Feb 7 13:56:11 2025 -0800

    Calculate size based flush threshold per topic (#14765)
    
    * calculate size based flush threshold per topic
    
    * emit duplicate metrics for backwards compatibility
---
 .../pinot/common/metrics/ControllerGauge.java      |  4 ++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  3 +--
 .../segment/FlushThresholdUpdateManager.java       | 28 +++++++++++++++-------
 .../SegmentSizeBasedFlushThresholdUpdater.java     | 11 ++++++++-
 .../segment/FlushThresholdUpdaterTest.java         | 20 +++++++++-------
 5 files changed, 45 insertions(+), 21 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 99bd892066..475f128208 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -174,11 +174,15 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // any partition in the realtime table. This metric is emitted from the 
segment size based threshold
   // computer.
   NUM_ROWS_THRESHOLD("numRowsThreshold", false),
+  // Added to preserve backwards compatibility of the above metric
+  NUM_ROWS_THRESHOLD_WITH_TOPIC("numRowsThresholdWithTopic", false),
 
   // The actual segment size for committing segments. These may be shorter 
than expected when the administrator
   // issues a force-commit, or zero when new partitions are detected in the 
stream (since there is no completing
   // segment when the partition is first detected).
   COMMITTING_SEGMENT_SIZE("committingSegmentSize", false),
+  // Added to preserve backwards compatibility of the above metric
+  COMMITTING_SEGMENT_SIZE_WITH_TOPIC("committingSegmentSizeWithTopic", false),
 
   TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index ce11aae1c0..e296136975 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -326,11 +326,10 @@ public class PinotLLCRealtimeSegmentManager {
     String realtimeTableName = tableConfig.getTableName();
     LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
 
-    _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
     List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
         streamConfig -> new StreamConfig(tableConfig.getTableName(), 
streamConfig)
     ).collect(Collectors.toList());
+    
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index e72971c5e0..b4476e3bac 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
 /**
- * Manager which maintains the flush threshold update objects for each table
+ * Manager which maintains the flush threshold update objects for each (table, 
topic) pair
  */
 public class FlushThresholdUpdateManager {
   private final ConcurrentMap<String, FlushThresholdUpdater> 
_flushThresholdUpdaterMap = new ConcurrentHashMap<>();
@@ -45,30 +46,39 @@ public class FlushThresholdUpdateManager {
    * partitions consumed by a server; FixedFlushThresholdUpdater sets the 
actual segment flush threshold as is.
    */
   public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig 
streamConfig) {
+    String tableTopicKey = getKey(streamConfig);
     String realtimeTableName = streamConfig.getTableNameWithType();
-
     int flushThresholdRows = streamConfig.getFlushThresholdRows();
     if (flushThresholdRows > 0) {
-      _flushThresholdUpdaterMap.remove(realtimeTableName);
+      _flushThresholdUpdaterMap.remove(tableTopicKey);
       return new DefaultFlushThresholdUpdater(flushThresholdRows);
     }
     int flushThresholdSegmentRows = 
streamConfig.getFlushThresholdSegmentRows();
     if (flushThresholdSegmentRows > 0) {
-      _flushThresholdUpdaterMap.remove(realtimeTableName);
+      _flushThresholdUpdaterMap.remove(tableTopicKey);
       return new FixedFlushThresholdUpdater(flushThresholdSegmentRows);
     }
     // Legacy behavior: when flush threshold rows is explicitly set to 0, use 
segment size based flush threshold
     long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
     if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
-      return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
-          k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName));
+      return _flushThresholdUpdaterMap.computeIfAbsent(tableTopicKey,
+          k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName, 
streamConfig.getTopicName()));
     } else {
-      _flushThresholdUpdaterMap.remove(realtimeTableName);
+      _flushThresholdUpdaterMap.remove(tableTopicKey);
       return new 
DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
     }
   }
 
-  public void clearFlushThresholdUpdater(String realtimeTableName) {
-    _flushThresholdUpdaterMap.remove(realtimeTableName);
+  public void clearFlushThresholdUpdater(StreamConfig streamConfig) {
+    _flushThresholdUpdaterMap.remove(getKey(streamConfig));
+  }
+
+  private String getKey(StreamConfig streamConfig) {
+    return streamConfig.getTableNameWithType() + "," + 
streamConfig.getTopicName();
+  }
+
+  @VisibleForTesting
+  public int getFlushThresholdUpdaterMapSize() {
+    return _flushThresholdUpdaterMap.size();
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 717a95bde3..1d9d3bd90d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -39,12 +39,14 @@ public class SegmentSizeBasedFlushThresholdUpdater 
implements FlushThresholdUpda
   public static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
   private final SegmentFlushThresholdComputer _flushThresholdComputer;
   private final String _realtimeTableName;
+  private final String _topicName;
 
   private final ControllerMetrics _controllerMetrics = ControllerMetrics.get();
 
-  public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) {
+  public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName, 
String topicName) {
     _flushThresholdComputer = new SegmentFlushThresholdComputer();
     _realtimeTableName = realtimeTableName;
+    _topicName = topicName;
   }
 
   // synchronized since this method could be called for multiple partitions of 
the same table in different threads
@@ -57,8 +59,15 @@ public class SegmentSizeBasedFlushThresholdUpdater 
implements FlushThresholdUpda
             newSegmentZKMetadata.getSegmentName());
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
 
+    // metrics tagged with table only
     _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, 
ControllerGauge.NUM_ROWS_THRESHOLD, threshold);
     _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, 
ControllerGauge.COMMITTING_SEGMENT_SIZE,
         committingSegmentDescriptor.getSegmentSizeBytes());
+
+    // metrics tagged with topic and table
+    _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName,
+        ControllerGauge.NUM_ROWS_THRESHOLD_WITH_TOPIC, threshold);
+    _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName,
+        ControllerGauge.COMMITTING_SEGMENT_SIZE_WITH_TOPIC, 
committingSegmentDescriptor.getSegmentSizeBytes());
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 712f80078b..2d45753015 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -90,7 +90,9 @@ public class FlushThresholdUpdaterTest {
     segmentBasedflushThresholdUpdater = flushThresholdUpdater;
 
     // Clear the updater
-    
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
+    
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 1);
+    flushThresholdUpdateManager.clearFlushThresholdUpdater(mockStreamConfig(0, 
-1, -1));
+    
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 0);
 
     // Call again with flush threshold rows set to 0 - a different Object 
should be returned
     flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1, 
-1));
@@ -140,7 +142,7 @@ public class FlushThresholdUpdaterTest {
     for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
         STEPS_SEGMENT_SIZES_MB)) {
       SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
+          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, 
streamConfig.getTopicName());
 
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -178,7 +180,7 @@ public class FlushThresholdUpdaterTest {
     for (long[] segmentSizesMB : 
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, 
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
         STEPS_SEGMENT_SIZES_MB)) {
       SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
+          new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, 
streamConfig.getTopicName());
 
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
@@ -238,9 +240,9 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testTimeThreshold() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
     StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, 
streamConfig.getTopicName());
 
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -272,9 +274,9 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testMinThreshold() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
     StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+        new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, 
streamConfig.getTopicName());
 
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -305,8 +307,6 @@ public class FlushThresholdUpdaterTest {
 
   @Test
   public void testSegmentSizeBasedUpdaterWithModifications() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
-        = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
     // Use customized stream config
     long flushSegmentDesiredSizeBytes = 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
@@ -314,6 +314,8 @@ public class FlushThresholdUpdaterTest {
     int flushAutotuneInitialRows = 
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2;
     StreamConfig streamConfig =
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, 
flushThresholdTimeMillis, flushAutotuneInitialRows);
+    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
+        = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME, 
streamConfig.getTopicName());
 
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to