This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0fc838fadb MSE throttling: Set a hard limit on number of MSE threads (#15143) 0fc838fadb is described below commit 0fc838fadbca80c6484ed7170470c72f91814d9e Author: Alberto Bastos <alberto.var...@startree.ai> AuthorDate: Mon Mar 3 15:29:48 2025 +0100 MSE throttling: Set a hard limit on number of MSE threads (#15143) --- .../apache/pinot/query/runtime/QueryRunner.java | 12 ++- .../pinot/spi/executor/HardLimitExecutor.java | 100 +++++++++++++++++++++ .../apache/pinot/spi/utils/CommonConstants.java | 3 + .../pinot/spi/executor/HardLimitExecutorTest.java | 62 +++++++++++++ 4 files changed, 174 insertions(+), 3 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 6fc87f0ed9..feefed0f1b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -73,6 +73,7 @@ import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.executor.ExecutorServiceUtils; +import org.apache.pinot.spi.executor.HardLimitExecutor; import org.apache.pinot.spi.trace.LoggerConstants; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; @@ -174,9 +175,14 @@ public class QueryRunner { String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE); _joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null; - _executorService = - ExecutorServiceUtils.create(config, Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX, "query-runner-on-" + port, - Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE); + _executorService = ExecutorServiceUtils.create(config, Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX, + "query-runner-on-" + port, Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE); + + int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config); + if (hardLimit > 0) { + _executorService = new HardLimitExecutor(hardLimit, _executorService); + } + _opChainScheduler = new OpChainSchedulerService(_executorService); _mailboxService = new MailboxService(hostname, port, config, tlsConfig); try { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java new file mode 100644 index 0000000000..3a83fbb5b3 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.executor; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * An Executor that allows a maximum of tasks running at the same time, rejecting immediately any excess. + */ +public class HardLimitExecutor extends DecoratorExecutorService { + + private final AtomicInteger _running; + private final int _max; + + public HardLimitExecutor(int max, ExecutorService executorService) { + super(executorService); + _running = new AtomicInteger(0); + _max = max; + } + + /** + * Returns the hard limit of the number of threads that can be used by the multi-stage executor. + * @param config Pinot configuration + * @return hard limit of the number of threads that can be used by the multi-stage executor (no hard limit if <= 0) + */ + public static int getMultiStageExecutorHardLimit(PinotConfiguration config) { + try { + int maxThreads = Integer.parseInt(config.getProperty( + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS + )); + int hardLimitFactor = Integer.parseInt(config.getProperty( + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR, + CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR + )); + if (maxThreads <= 0 || hardLimitFactor <= 0) { + return 0; + } + return maxThreads * hardLimitFactor; + } catch (NumberFormatException e) { + return Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS) + * Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR); + } + } + + protected void checkTaskAllowed() { + if (_running.get() >= _max) { + throw new IllegalStateException("Tasks limit exceeded."); + } + } + + @Override + protected <T> Callable<T> decorate(Callable<T> task) { + checkTaskAllowed(); + return () -> { + checkTaskAllowed(); + _running.getAndIncrement(); + try { + return task.call(); + } finally { + _running.decrementAndGet(); + } + }; + } + + @Override + protected Runnable decorate(Runnable task) { + checkTaskAllowed(); + return () -> { + checkTaskAllowed(); + _running.getAndIncrement(); + try { + task.run(); + } finally { + _running.decrementAndGet(); + } + }; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 9ba8b4f85f..b0db23e488 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -248,6 +248,9 @@ public class CommonConstants { public static final String CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS = "pinot.beta.multistage.engine.max.server.query.threads"; public static final String DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS = "-1"; + public static final String CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = + "pinot.beta.multistage.engine.max.server.query.threads.hardlimit.factor"; + public static final String DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = "4"; // Preprocess throttle configs public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM = diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java new file mode 100644 index 0000000000..4dc71ee092 --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.executor; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import org.testng.annotations.Test; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.fail; + + +public class HardLimitExecutorTest { + + @Test + public void testHardLimit() + throws Exception { + HardLimitExecutor ex = new HardLimitExecutor(1, Executors.newCachedThreadPool()); + CyclicBarrier barrier = new CyclicBarrier(2); + + try { + ex.execute(() -> { + try { + barrier.await(); + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException | BrokenBarrierException e) { + // do nothing + } + }); + + barrier.await(); + try { + ex.execute(() -> { + // do nothing + }); + fail("Should not allow more than 1 task"); + } catch (Exception e) { + // as expected + assertEquals("Tasks limit exceeded.", e.getMessage()); + } + } finally { + ex.shutdownNow(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org