This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f43664dd13 Support server level consumption throttle (#12292)
f43664dd13 is described below

commit f43664dd1384086be6669aa76a3e08c2db8a11e1
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Tue Jan 23 22:34:36 2024 -0800

    Support server level consumption throttle (#12292)
---
 .../pinot/common/metrics/AbstractMetrics.java      |   6 +
 .../realtime/RealtimeConsumptionRateManager.java   |  39 ++-
 .../realtime/RealtimeSegmentDataManager.java       |  10 +-
 .../RealtimeConsumptionRateManagerTest.java        |  49 +++-
 ...nsumptionRateLimiterClusterIntegrationTest.java | 298 +++++++++++++++++++++
 .../server/starter/helix/BaseServerStarter.java    |   5 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   5 +
 7 files changed, 394 insertions(+), 18 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 6dccf789d1..456d6ecb14 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -337,6 +337,12 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, 
meter.getUnit(), TimeUnit.SECONDS);
   }
 
+  public PinotMeter getMeteredValue(final M meter) {
+    final PinotMetricName metricName =
+        PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + 
meter.getMeterName());
+    return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, 
meter.getUnit(), TimeUnit.SECONDS);
+  }
+
   private String getTableFullMeterName(final String tableName, final M meter) {
     String meterName = meter.getMeterName();
     return _metricPrefix + getTableName(tableName) + "." + meterName;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index bc5d16e915..c866ce2654 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,19 +30,26 @@ import java.time.Instant;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * This class is responsible for creating realtime consumption rate limiters. 
The rate limit, specified in
- * StreamConfig of table config, is for the entire topic. The effective rate 
limit for each partition is simply the
- * specified rate limit divided by the partition count.
+ * This class is responsible for creating realtime consumption rate limiters.
+ * It contains one rate limiter for the entire server and multiple table 
partition level rate limiters.
+ * Server rate limiter is used to throttle the overall consumption rate of the 
server and configured via
+ * cluster or server config.
+ * For table partition level rate limiter, the rate limit value specified in 
StreamConfig of table config, is for the
+ * entire topic. The effective rate limit for each partition is simply the 
specified rate limit divided by the
+ * partition count.
  * This class leverages a cache for storing partition count for different 
topics as retrieving partition count from
  * stream is a bit expensive and also the same count will be used of all 
partition consumers of the same topic.
  */
@@ -51,6 +57,10 @@ public class RealtimeConsumptionRateManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeConsumptionRateManager.class);
   private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10;
 
+  private static final String SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME =
+      ServerMeter.REALTIME_ROWS_CONSUMED.getMeterName();
+  private ConsumptionRateLimiter _serverRateLimiter = NOOP_RATE_LIMITER;
+
   // stream config object is required for fetching the partition count from 
the stream
   private final LoadingCache<StreamConfig, Integer> 
_streamConfigToTopicPartitionCountMap;
   private volatile boolean _isThrottlingAllowed = false;
@@ -73,9 +83,28 @@ public class RealtimeConsumptionRateManager {
     _isThrottlingAllowed = true;
   }
 
+  public ConsumptionRateLimiter createServerRateLimiter(PinotConfiguration 
serverConfig, ServerMetrics serverMetrics) {
+    double serverRateLimit =
+        
serverConfig.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+            CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT);
+    if (serverRateLimit <= 0) {
+      LOGGER.warn("Invalid server consumption rate limit: {}, throttling is 
disabled", serverRateLimit);
+      _serverRateLimiter = NOOP_RATE_LIMITER;
+    } else {
+      LOGGER.info("A server consumption rate limiter is set up with rate 
limit: {}", serverRateLimit);
+      MetricEmitter metricEmitter = new MetricEmitter(serverMetrics, 
SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME);
+      _serverRateLimiter = new RateLimiterImpl(serverRateLimit, metricEmitter);
+    }
+    return _serverRateLimiter;
+  }
+
+  public ConsumptionRateLimiter getServerRateLimiter() {
+    return _serverRateLimiter;
+  }
+
   public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, 
String tableName,
       ServerMetrics serverMetrics, String metricKeyName) {
-    if (!streamConfig.getTopicConsumptionRateLimit().isPresent()) {
+    if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) {
       return NOOP_RATE_LIMITER;
     }
     int partitionCount;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..d64e85fada 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -298,7 +298,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final boolean _isOffHeap;
   private final boolean _nullHandlingEnabled;
   private final SegmentCommitterFactory _segmentCommitterFactory;
-  private final ConsumptionRateLimiter _rateLimiter;
+  private final ConsumptionRateLimiter _partitionRateLimiter;
+  private final ConsumptionRateLimiter _serverRateLimiter;
 
   private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
   private final CompletionMode _segmentCompletionMode;
@@ -516,7 +517,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
    */
   private boolean processStreamEvents(MessageBatch messagesAndOffsets, long 
idlePipeSleepTimeMillis) {
     int messageCount = messagesAndOffsets.getMessageCount();
-    _rateLimiter.throttle(messageCount);
+    _partitionRateLimiter.throttle(messageCount);
+    _serverRateLimiter.throttle(messageCount);
 
     PinotMeter realtimeRowsConsumedMeter = null;
     PinotMeter realtimeRowsDroppedMeter = null;
@@ -605,6 +607,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             realtimeRowsConsumedMeter =
                 _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
+            
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
           } catch (Exception e) {
             _numRowsErrored++;
             String errorMessage = String.format("Caught exception while 
indexing the record: %s", transformedRow);
@@ -1395,8 +1398,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       _memoryManager = new DirectMemoryManager(_segmentNameStr, 
_serverMetrics);
     }
 
-    _rateLimiter = RealtimeConsumptionRateManager.getInstance()
+    _partitionRateLimiter = RealtimeConsumptionRateManager.getInstance()
         .createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, 
_clientId);
+    _serverRateLimiter = 
RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     String sortedColumn;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 21c58f9afa..325066fa16 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -27,14 +27,12 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.MetricEmitter;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER;
-import static 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -51,6 +49,10 @@ public class RealtimeConsumptionRateManagerTest {
   private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class);
   private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class);
   private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class);
+  private static final PinotConfiguration SERVER_CONFIG_1 = 
mock(PinotConfiguration.class);
+  private static final PinotConfiguration SERVER_CONFIG_2 = 
mock(PinotConfiguration.class);
+  private static final PinotConfiguration SERVER_CONFIG_3 = 
mock(PinotConfiguration.class);
+  private static final PinotConfiguration SERVER_CONFIG_4 = 
mock(PinotConfiguration.class);
   private static RealtimeConsumptionRateManager _consumptionRateManager;
 
   static {
@@ -65,6 +67,15 @@ public class RealtimeConsumptionRateManagerTest {
     
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
     
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
     _consumptionRateManager = new RealtimeConsumptionRateManager(cache);
+
+    
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+        
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
+    
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+        
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
+    
when(SERVER_CONFIG_3.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+        
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(0.0);
+    
when(SERVER_CONFIG_4.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+        
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(-1.0);
   }
 
   @Test
@@ -83,7 +94,27 @@ public class RealtimeConsumptionRateManagerTest {
   }
 
   @Test
-  public void testBuildCache() throws Exception {
+  public void testCreateServerRateLimiter() {
+    // Server config 1
+    ConsumptionRateLimiter rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null);
+    assertEquals(5.0, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);
+
+    // Server config 2
+    rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null);
+    assertEquals(2.5, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);
+
+    // Server config 3
+    rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null);
+    assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+    // Server config 4
+    rateLimiter = 
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null);
+    assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+  }
+
+  @Test
+  public void testBuildCache()
+      throws Exception {
     PartitionCountFetcher partitionCountFetcher = 
mock(PartitionCountFetcher.class);
     LoadingCache<StreamConfig, Integer> cache = 
buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS);
     when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10);
@@ -150,21 +181,21 @@ public class RealtimeConsumptionRateManagerTest {
     now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), 
ZoneOffset.UTC).instant();
     int sumOfMsgsInPrevMinute = sum(numMsgs);
     int expectedRatio = calcExpectedRatio(rateLimitInMinutes, 
sumOfMsgsInPrevMinute);
-    numMsgs = new int[] {35};
+    numMsgs = new int[]{35};
     assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), 
expectedRatio);
 
     // 3rd minute
     now = Clock.fixed(Instant.parse("2022-08-10T12:02:25Z"), 
ZoneOffset.UTC).instant();
     sumOfMsgsInPrevMinute = sum(numMsgs);
     expectedRatio = calcExpectedRatio(rateLimitInMinutes, 
sumOfMsgsInPrevMinute);
-    numMsgs = new int[] {0};
+    numMsgs = new int[]{0};
     assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), 
expectedRatio);
 
     // 4th minute
     now = Clock.fixed(Instant.parse("2022-08-10T12:03:15Z"), 
ZoneOffset.UTC).instant();
     sumOfMsgsInPrevMinute = sum(numMsgs);
     expectedRatio = calcExpectedRatio(rateLimitInMinutes, 
sumOfMsgsInPrevMinute);
-    numMsgs = new int[] {10, 20};
+    numMsgs = new int[]{10, 20};
     assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), 
expectedRatio);
     now = Clock.fixed(Instant.parse("2022-08-10T12:03:20Z"), 
ZoneOffset.UTC).instant();
     assertEquals(metricEmitter.emitMetric(numMsgs[1], rateLimit, now), 
expectedRatio);
@@ -173,7 +204,7 @@ public class RealtimeConsumptionRateManagerTest {
     now = Clock.fixed(Instant.parse("2022-08-10T12:04:30Z"), 
ZoneOffset.UTC).instant();
     sumOfMsgsInPrevMinute = sum(numMsgs);
     expectedRatio = calcExpectedRatio(rateLimitInMinutes, 
sumOfMsgsInPrevMinute);
-    numMsgs = new int[] {5};
+    numMsgs = new int[]{5};
     assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), 
expectedRatio);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
new file mode 100644
index 0000000000..ca1253a0f6
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class RealtimeConsumptionRateLimiterClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(RealtimeConsumptionRateLimiterClusterIntegrationTest.class);
+
+  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  private static final long RANDOM_SEED = System.currentTimeMillis();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final double SERVER_RATE_LIMIT = 100;
+
+  private final boolean _isDirectAlloc = RANDOM.nextBoolean();
+  private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+  private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+  private List<File> _avroFiles;
+
+  @Override
+  protected String getLoadMode() {
+    return ReadMode.mmap.name();
+  }
+
+  @Override
+  public void startController()
+      throws Exception {
+    super.startController();
+    
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
 true);
+    
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION,
 _isDirectAlloc);
+    if (_isConsumerDirConfigured) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, 
CONSUMER_DIRECTORY);
+    }
+    
configuration.setProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
 SERVER_RATE_LIMIT);
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(
+        new 
StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
+    return ingestionConfig;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // all the data that was ingested from Kafka also got uploaded via the 
controller's upload endpoint
+    return super.getCountStarResult() * 2;
+  }
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    // Remove the consumer directory
+    FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    _avroFiles = unpackAvroData(_tempDir);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(_avroFiles);
+  }
+
+  @AfterClass
+  @Override
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Test
+  public void testOneTableRateLimit()
+      throws Exception {
+    String tableName = getTableName();
+    try {
+      // Create and upload the schema and table config
+      Schema schema = createSchema();
+      addSchema(schema);
+      long startTime = System.currentTimeMillis();
+      TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
+      addTableConfig(tableConfig);
+      for (int i = 0; i < 60; i++) {
+        if (!isTableLoaded(tableName)) {
+          Thread.sleep(1000L);
+        } else {
+          break;
+        }
+      }
+      PinotMeter realtimeRowConsumedMeter = 
ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
+      long startCount = getCurrentCountStarResult(tableName);
+      for (int i = 1; i <= 10; i++) {
+        Thread.sleep(1000L);
+        long currentCount = getCurrentCountStarResult(tableName);
+        double currentRate = (currentCount - startCount) / (double) 
(System.currentTimeMillis() - startTime) * 1000;
+        LOGGER.info("Second = " + i + ", realtimeRowConsumedMeter = " + 
realtimeRowConsumedMeter.oneMinuteRate()
+            + ", currentCount = " + currentCount + ", currentRate = " + 
currentRate);
+        Assert.assertTrue(realtimeRowConsumedMeter.oneMinuteRate() < 
SERVER_RATE_LIMIT,
+            "Rate should be less than " + SERVER_RATE_LIMIT);
+        Assert.assertTrue(currentRate < SERVER_RATE_LIMIT * 1.5, // Put some 
leeway for the rate calculation
+            "Rate should be less than " + SERVER_RATE_LIMIT);
+      }
+    } finally {
+      dropRealtimeTable(tableName);
+      
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+    }
+  }
+
+  @Test
+  public void testTwoTableRateLimit()
+      throws Exception {
+    String tableName1 = "testTable1";
+    String tableName2 = "testTable2";
+
+    try {
+      // Create and upload the schema and table config
+      Schema schema1 = createSchema();
+      schema1.setSchemaName("testTable1");
+      addSchema(schema1);
+      Schema schema2 = createSchema();
+      schema2.setSchemaName("testTable2");
+      addSchema(schema2);
+      long startTime = System.currentTimeMillis();
+
+      TableConfig tableConfig1 = createRealtimeTableConfig(tableName1);
+      addTableConfig(tableConfig1);
+      TableConfig tableConfig2 = createRealtimeTableConfig(tableName2);
+      addTableConfig(tableConfig2);
+      for (int i = 0; i < 60; i++) {
+        if (!isTableLoaded(tableName1) || !isTableLoaded(tableName2)) {
+          Thread.sleep(1000L);
+        } else {
+          break;
+        }
+      }
+
+      PinotMeter serverRowConsumedMeter = 
ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
+      long startCount1 = getCurrentCountStarResult(tableName1);
+      long startCount2 = getCurrentCountStarResult(tableName2);
+      for (int i = 1; i <= 10; i++) {
+        Thread.sleep(1000L);
+        long currentCount1 = getCurrentCountStarResult(tableName1);
+        long currentCount2 = getCurrentCountStarResult(tableName2);
+        long currentServerCount = currentCount1 + currentCount2;
+        long currentTimeMillis = System.currentTimeMillis();
+        double currentRate1 = (currentCount1 - startCount1) / (double) 
(currentTimeMillis - startTime) * 1000;
+        double currentRate2 = (currentCount2 - startCount2) / (double) 
(currentTimeMillis - startTime) * 1000;
+        double currentServerRate = currentRate1 + currentRate2;
+        LOGGER.info("Second = " + i + ", serverRowConsumedMeter = " + 
serverRowConsumedMeter.oneMinuteRate()
+            + ", currentCount1 = " + currentCount1 + ", currentRate1 = " + 
currentRate1
+            + ", currentCount2 = " + currentCount2 + ", currentRate2 = " + 
currentRate2
+            + ", currentServerCount = " + currentServerCount + ", 
currentServerRate = " + currentServerRate
+        );
+
+        Assert.assertTrue(serverRowConsumedMeter.oneMinuteRate() < 
SERVER_RATE_LIMIT,
+            "Rate should be less than " + SERVER_RATE_LIMIT + ", 
serverOneMinuteRate = " + serverRowConsumedMeter
+                .oneMinuteRate());
+        Assert.assertTrue(currentServerRate < SERVER_RATE_LIMIT * 1.5,
+            // Put some leeway for the rate calculation
+            "Whole table ingestion rate should be less than " + 
SERVER_RATE_LIMIT + ", currentRate1 = " + currentRate1
+                + ", currentRate2 = " + currentRate2 + ", currentServerRate = 
" + currentServerRate);
+      }
+    } finally {
+      dropRealtimeTable(tableName1);
+      
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName1));
+      dropRealtimeTable(tableName2);
+      
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName2));
+    }
+  }
+
+  protected TableConfig createRealtimeTableConfig() {
+    return createRealtimeTableConfig(getTableName());
+  }
+
+  protected TableConfig createRealtimeTableConfig(String tableName) {
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
+        
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+        
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+        
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
+        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+  }
+
+  private boolean isTableLoaded(String tableName) {
+    try {
+      return getCurrentCountStarResult(tableName) > 0;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  @Override
+  protected Map<String, String> getStreamConfigs() {
+    return null;
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedServerPartitionedSqlQueries() {
+    // Do nothing
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 2b771707d2..10d9e6bec1 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -205,7 +205,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
         
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
             Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
-
     // Set data table version send to broker.
     int dataTableVersion =
         _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, 
DataTableBuilderFactory.DEFAULT_VERSION);
@@ -573,6 +572,10 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     ServerConf serverConf = new ServerConf(_serverConf);
     _serverInstance = new ServerInstance(serverConf, _helixManager, 
accessControlFactory);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+
+    // Enable Server level realtime ingestion rate limier
+    
RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf,
 serverMetrics);
+
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
     instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
     // initialize the thread accountant for query killing
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1ba40c9003..190de27151 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -619,6 +619,11 @@ public class CommonConstants {
     // This is also the default in the case a user misconfigures this by 
setting to <= 0.
     public static final int DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS = 10000;
 
+    // Config for realtime consumption message rate limit
+    public static final String CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT = 
"pinot.server.consumption.rate.limit";
+    // Default to 0.0 (no limit)
+    public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
+
     public static final String DEFAULT_READ_MODE = "mmap";
     // Whether to reload consuming segment on scheme update
     public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to