This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fc838fadb MSE throttling: Set a hard limit on number of MSE threads 
(#15143)
0fc838fadb is described below

commit 0fc838fadbca80c6484ed7170470c72f91814d9e
Author: Alberto Bastos <alberto.var...@startree.ai>
AuthorDate: Mon Mar 3 15:29:48 2025 +0100

    MSE throttling: Set a hard limit on number of MSE threads (#15143)
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  12 ++-
 .../pinot/spi/executor/HardLimitExecutor.java      | 100 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +
 .../pinot/spi/executor/HardLimitExecutorTest.java  |  62 +++++++++++++
 4 files changed, 174 insertions(+), 3 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6fc87f0ed9..feefed0f1b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -73,6 +73,7 @@ import 
org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.executor.ExecutorServiceUtils;
+import org.apache.pinot.spi.executor.HardLimitExecutor;
 import org.apache.pinot.spi.trace.LoggerConstants;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -174,9 +175,14 @@ public class QueryRunner {
     String joinOverflowModeStr = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
     _joinOverflowMode = joinOverflowModeStr != null ? 
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
 
-    _executorService =
-        ExecutorServiceUtils.create(config, 
Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX, "query-runner-on-" + port,
-            Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
+    _executorService = ExecutorServiceUtils.create(config, 
Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX,
+        "query-runner-on-" + port, Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
+
+    int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config);
+    if (hardLimit > 0) {
+      _executorService = new HardLimitExecutor(hardLimit, _executorService);
+    }
+
     _opChainScheduler = new OpChainSchedulerService(_executorService);
     _mailboxService = new MailboxService(hostname, port, config, tlsConfig);
     try {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
new file mode 100644
index 0000000000..3a83fbb5b3
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * An Executor that allows a maximum of tasks running at the same time, 
rejecting immediately any excess.
+ */
+public class HardLimitExecutor extends DecoratorExecutorService {
+
+  private final AtomicInteger _running;
+  private final int _max;
+
+  public HardLimitExecutor(int max, ExecutorService executorService) {
+    super(executorService);
+    _running = new AtomicInteger(0);
+    _max = max;
+  }
+
+  /**
+   * Returns the hard limit of the number of threads that can be used by the 
multi-stage executor.
+   * @param config Pinot configuration
+   * @return hard limit of the number of threads that can be used by the 
multi-stage executor (no hard limit if <= 0)
+   */
+  public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
+    try {
+      int maxThreads = Integer.parseInt(config.getProperty(
+          
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
+          
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+      ));
+      int hardLimitFactor = Integer.parseInt(config.getProperty(
+          
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR,
+          
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR
+      ));
+      if (maxThreads <= 0 || hardLimitFactor <= 0) {
+        return 0;
+      }
+      return maxThreads * hardLimitFactor;
+    } catch (NumberFormatException e) {
+      return 
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
+          * 
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+    }
+  }
+
+  protected void checkTaskAllowed() {
+    if (_running.get() >= _max) {
+      throw new IllegalStateException("Tasks limit exceeded.");
+    }
+  }
+
+  @Override
+  protected <T> Callable<T> decorate(Callable<T> task) {
+    checkTaskAllowed();
+    return () -> {
+      checkTaskAllowed();
+      _running.getAndIncrement();
+      try {
+        return task.call();
+      } finally {
+        _running.decrementAndGet();
+      }
+    };
+  }
+
+  @Override
+  protected Runnable decorate(Runnable task) {
+    checkTaskAllowed();
+    return () -> {
+      checkTaskAllowed();
+      _running.getAndIncrement();
+      try {
+        task.run();
+      } finally {
+        _running.decrementAndGet();
+      }
+    };
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9ba8b4f85f..b0db23e488 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -248,6 +248,9 @@ public class CommonConstants {
     public static final String 
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS =
         "pinot.beta.multistage.engine.max.server.query.threads";
     public static final String 
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS = "-1";
+    public static final String 
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR =
+        
"pinot.beta.multistage.engine.max.server.query.threads.hardlimit.factor";
+    public static final String 
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = "4";
 
     // Preprocess throttle configs
     public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM =
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
new file mode 100644
index 0000000000..4dc71ee092
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+
+
+public class HardLimitExecutorTest {
+
+  @Test
+  public void testHardLimit()
+      throws Exception {
+    HardLimitExecutor ex = new HardLimitExecutor(1, 
Executors.newCachedThreadPool());
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    try {
+      ex.execute(() -> {
+        try {
+          barrier.await();
+          Thread.sleep(Long.MAX_VALUE);
+        } catch (InterruptedException | BrokenBarrierException e) {
+          // do nothing
+        }
+      });
+
+      barrier.await();
+      try {
+        ex.execute(() -> {
+          // do nothing
+        });
+        fail("Should not allow more than 1 task");
+      } catch (Exception e) {
+        // as expected
+        assertEquals("Tasks limit exceeded.", e.getMessage());
+      }
+    } finally {
+      ex.shutdownNow();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to