This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0169021a55 Update the segment operation throttle defaults to Integer.MAX_VALUE (#15126) 0169021a55 is described below commit 0169021a55a60430676dcae0d34dc1dd413aa031 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Wed Feb 26 00:03:29 2025 -0800 Update the segment operation throttle defaults to Integer.MAX_VALUE (#15126) --- .../utils/SegmentOperationsThrottlerTest.java | 75 ++++++++++++---------- .../apache/pinot/spi/utils/CommonConstants.java | 21 ++++-- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java index b560dd6602..1ceff184af 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java @@ -308,11 +308,14 @@ public class SegmentOperationsThrottlerTest { ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -322,12 +325,11 @@ public class SegmentOperationsThrottlerTest { operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -340,11 +342,11 @@ public class SegmentOperationsThrottlerTest { int initialPermits = 4; List<BaseSegmentOperationsThrottler> segmentOperationsThrottlerList = new ArrayList<>(); segmentOperationsThrottlerList.add(new SegmentAllIndexPreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentStarTreePreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentDownloadThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); for (BaseSegmentOperationsThrottler operationsThrottler : segmentOperationsThrottlerList) { int defaultPermitsBeforeQuery = operationsThrottler instanceof SegmentAllIndexPreprocessThrottler @@ -353,14 +355,17 @@ public class SegmentOperationsThrottlerTest { ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - 5); + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1 - 5); } // Double the permits for before serving queries config @@ -370,29 +375,29 @@ public class SegmentOperationsThrottlerTest { : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery * 2)); + String.valueOf(defaultPermitsBeforeQuery)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + // We increased permits but took some before the increase + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - numPermitsToTake); - // Take remaining permits - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + // Take more permits + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), + defaultPermitsBeforeQuery - numPermitsToTake - i - 1); } // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - (defaultPermitsBeforeQuery * 2)); + initialPermits - (numPermitsToTake * 2)); - for (int i = 0; i < defaultPermitsBeforeQuery * 2; i++) { + for (int i = 0; i < numPermitsToTake * 2; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery * 2) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake * 2) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -418,11 +423,14 @@ public class SegmentOperationsThrottlerTest { ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -430,27 +438,26 @@ public class SegmentOperationsThrottlerTest { // Half the permits for before serving queries config Map<String, String> updatedClusterConfigs = new HashMap<>(); + int newDefaultPermits = defaultPermitsBeforeQuery / 2; updatedClusterConfigs.put(operationsThrottler instanceof SegmentAllIndexPreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery / 2)); + String.valueOf(newDefaultPermits)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery / 2); + Assert.assertEquals(operationsThrottler.totalPermits(), newDefaultPermits); // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), -(defaultPermitsBeforeQuery / 2)); + Assert.assertEquals(operationsThrottler.availablePermits(), newDefaultPermits - numPermitsToTake); // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); - Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3933187f7f..9ba8b4f85f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -252,26 +252,33 @@ public class CommonConstants { // Preprocess throttle configs public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM = "pinot.server.max.segment.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(100); + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); // Before serving queries is enabled, we should use a higher preprocess parallelism to process segments faster public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.preprocess.parallelism.before.serving.queries"; - // Use the below default before enabling queries on the server - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(100); + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); // Preprocess throttle config specifically for StarTree index rebuild public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = "pinot.server.max.segment.startree.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(100); + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.startree.preprocess.parallelism.before.serving.queries"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = - String.valueOf(100); + String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "pinot.server.max.segment.download.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "100"; + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.download.parallelism.before.serving.queries"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "100"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); } public static class Broker { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org