This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9176042a17d Implement comprehensive queue-based throttling for
ThrottleOnCriticalHeapUsageExecutor (#16409)
9176042a17d is described below
commit 9176042a17d6639adf99846d210066c600d3ca46
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Oct 17 13:01:58 2025 +0800
Implement comprehensive queue-based throttling for
ThrottleOnCriticalHeapUsageExecutor (#16409)
- Add complete queue-based throttling mechanism to handle heap-critical
scenarios
- Implement configurable queue size, timeout, and monitoring interval
- Add background monitoring thread to process queued tasks when heap usage
drops
- Support both Runnable and Callable task types with proper timeout handling
- Add comprehensive metrics tracking: queue size, queued tasks, processed
tasks, timeouts, shutdown cancellations
- Implement graceful shutdown with proper cleanup of queued tasks
- Add extensive test coverage with 7 test methods covering:
* Immediate execution when heap usage is normal
* Task timeout behavior under critical heap conditions
* Queue overflow protection and graceful handling
* Task queuing and processing during heap recovery
* Callable task queuing functionality
* Shutdown behavior and cleanup verification
* Comprehensive metrics tracking validation
- All tests pass successfully, ensuring robust queue-based throttling
functionality
- Update ServerGauge metrics to support throttling-related metrics exposure
This implementation provides production-ready queue-based throttling that
gracefully handles heap-critical scenarios while maintaining system stability
and providing detailed observability through comprehensive metrics.
---
.../apache/pinot/common/metrics/ServerGauge.java | 6 +-
.../apache/pinot/common/metrics/ServerMeter.java | 10 +
.../ThrottleOnCriticalHeapUsageExecutor.java | 538 +++++++++++++++++++++
.../query/scheduler/resources/ResourceManager.java | 14 +-
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 104 ++++
.../apache/pinot/query/runtime/QueryRunner.java | 10 +-
.../spi/executor/DecoratorExecutorService.java | 2 +-
.../ThrottleOnCriticalHeapUsageExecutor.java | 61 ---
.../apache/pinot/spi/utils/CommonConstants.java | 11 +
9 files changed, 677 insertions(+), 79 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index a0401497b3c..2d9685b4a41 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -148,7 +148,11 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
"Indicates the time consumer spends while waiting on the consumer
lock."),
// commit-time compaction gauge metrics
- COMMIT_TIME_COMPACTION_RATIO_PERCENT("percentage", false, "Percentage of
rows removed during commit-time compaction");
+ COMMIT_TIME_COMPACTION_RATIO_PERCENT("percentage", false, "Percentage of
rows removed during commit-time compaction"),
+
+ // ThrottleOnCriticalHeapUsageExecutor metrics
+ THROTTLE_EXECUTOR_QUEUE_SIZE("count", true,
+ "Current number of tasks in the throttle executor queue");
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index a10f25661c8..89cdb7ef4bc 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -210,6 +210,16 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// reingestion metrics
SEGMENT_REINGESTION_FAILURE("segments", false),
+ // ThrottleOnCriticalHeapUsageExecutor meters
+ THROTTLE_EXECUTOR_QUEUED_TASKS("count", true,
+ "Number of tasks that have been queued in the throttle executor"),
+ THROTTLE_EXECUTOR_PROCESSED_TASKS("count", true,
+ "Number of tasks processed from the throttle executor queue"),
+ THROTTLE_EXECUTOR_TIMED_OUT_TASKS("count", true,
+ "Number of tasks that timed out in the throttle executor queue"),
+ THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS("count", true,
+ "Number of tasks canceled during throttle executor shutdown"),
+
// commit-time compaction metrics
COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS("segments", false,
"Number of segments processed with commit-time compaction enabled"),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java
new file mode 100644
index 00000000000..b8f82122c2e
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final ServerMetrics _serverMetrics;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private long _lastQueueStatusLogTime = 0;
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+ super(executorService);
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded
queue
+ _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new
LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ // Cache metrics and register gauge for queue activity tracking
+ _serverMetrics = ServerMetrics.get();
+
_serverMetrics.setOrUpdateGlobalGauge(ServerGauge.THROTTLE_EXECUTOR_QUEUE_SIZE,
() -> (long) _taskQueue.size());
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms (maxQueueSize<=0 => unbounded)",
maxQueueSize, defaultQueueTimeoutMs,
+ monitorIntervalMs);
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+ if (ctx == null) {
+ // No context available on this thread (e.g., monitor thread); do not
throttle
+ return false;
+ }
+ return ctx.getAccountant().throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
+ try {
+ int initialQueueSize = _taskQueue.size();
+ long currentTime = System.currentTimeMillis();
+
+ // Log queue size for monitoring if there are queued tasks (throttled to
prevent log flooding)
+ if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) >
LOG_THROTTLE_INTERVAL_MS) {
+ LOGGER.info("Processing queued tasks. Current queue size: {}",
initialQueueSize);
+ _lastQueueStatusLogTime = currentTime;
+ // Metrics are exported via ServerMetrics global gauges registered by
callers
+ }
+
+ // Process tasks while the queue head is allowed to run (i.e., its
submitting accountant is not throttling)
+ while (!_taskQueue.isEmpty()) {
+ QueuedTask<?> headTask = _taskQueue.peek();
+ if (headTask == null) {
+ break;
+ }
+
+ if (headTask.isThrottled()) {
+ // Still throttled for the submitting context; stop processing for
now
+ break;
+ }
+
+ // Safe to process this task
+ QueuedTask<?> polledTask = _taskQueue.poll();
+ if (polledTask != null) {
+ long queueTime = System.currentTimeMillis() -
polledTask.getQueueTime();
+
+ if (queueTime > polledTask.getTimeoutMs()) {
+ // Task has timed out in queue
+ polledTask.timeout();
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.THROTTLE_EXECUTOR_TIMED_OUT_TASKS,
1);
+ LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)",
queueTime, polledTask.getTimeoutMs());
+ } else {
+ // Submit the task for execution on the underlying executor (avoid
double decoration)
+ QueuedTask<?> submittedTask = polledTask;
+ _executorService.execute(new FromQueueRunnable(() -> {
+ try {
+ submittedTask.execute();
+ _serverMetrics.addMeteredGlobalValue(
+ ServerMeter.THROTTLE_EXECUTOR_PROCESSED_TASKS, 1);
+ LOGGER.debug("Processed queued task after {}ms in queue",
queueTime);
+ } catch (Exception e) {
+ LOGGER.error("Error executing queued task", e);
+ submittedTask.fail(e);
+ }
+ }));
+ }
+ }
+ }
+
+ // Log completion only for significant processing (5+ tasks) to avoid
log spam
+ int finalQueueSize = _taskQueue.size();
+ if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+ int processedThisCycle = initialQueueSize - finalQueueSize;
+ if (processedThisCycle >= 5) {
+ LOGGER.info("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ } else {
+ LOGGER.debug("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in processQueuedTasks", e);
+ }
+ }
+
+ @Override
+ protected <T> Callable<T> decorate(Callable<T> task) {
+ return () -> {
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ return queueCallableTask(task, timeoutMs);
+ } else {
+ // Execute immediately if heap usage is normal
+ return task.call();
+ }
+ };
+ }
+
+ @Override
+ protected Runnable decorate(Runnable task) {
+ // If the task comes from this executor's internal queue, avoid
re-applying queue logic
+ if (task instanceof FromQueueRunnable) {
+ return ((FromQueueRunnable) task).getDelegate();
+ }
+
+ // IMPORTANT: Decide throttling at submission time so that queuing is
visible immediately to callers.
+ // This ensures tests (and metrics) observe the full number of queued
tasks, not just the ones picked by
+ // the underlying thread pool yet.
+ if (shouldQueueTask()) {
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ enqueueRunnableTask(task, timeoutMs);
+ // Return a no-op runnable because the real task will be executed later
by the monitor when dequeued
+ return () -> {
+ };
+ }
+
+ // No throttling: run the task as-is
+ return task;
+ }
+
+ private long computePerTaskTimeoutMs(long configuredTimeoutMs) {
+ try {
+ if (QueryThreadContext.getIfAvailable() != null &&
QueryThreadContext.get().getExecutionContext() != null) {
+ long remaining =
QueryThreadContext.get().getExecutionContext().getActiveDeadlineMs()
+ - System.currentTimeMillis();
+ if (remaining <= 0) {
+ return 0L;
+ }
+ // Do not wait in the queue longer than the query's remaining active
deadline
+ return Math.min(configuredTimeoutMs, remaining);
+ }
+ } catch (Throwable t) {
+ // Be conservative and use configured timeout if context is not available
+ LOGGER.debug("Failed to compute per-task timeout from thread context;
using configured timeout.", t);
+ }
+ return configuredTimeoutMs;
+ }
+
+ /**
+ * Queue a callable task and wait for its execution
+ */
+ private <T> T queueCallableTask(Callable<T> task, long timeoutMs)
+ throws Exception {
+ QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task,
timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS,
1);
+ LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+ // Wait for the task to complete or timeout
+ return queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Queue a runnable task and wait for its execution
+ */
+ private void queueRunnableTask(Runnable task, long timeoutMs) {
+ QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS,
1);
+ LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+ try {
+ // Wait for the task to complete or timeout
+ queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ } else {
+ throw new RuntimeException("Error executing queued task", e);
+ }
+ }
+ }
+
+ /**
+ * Enqueue a runnable task without waiting for its completion. Used when
queuing at submission time.
+ */
+ private void enqueueRunnableTask(Runnable task, long timeoutMs) {
+ QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS,
1);
+ LOGGER.debug("Enqueued runnable task (non-blocking), queue size: {}",
_taskQueue.size());
+ }
+
+ @Override
+ public void shutdown() {
+ _isShutdown.set(true);
+ _monitorExecutor.shutdownNow();
+
+ // Allow the monitor thread to complete current processing
+ try {
+ if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn("Monitor executor did not terminate within the timeout
period.");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for monitor executor to
terminate.", e);
+ Thread.currentThread().interrupt();
+ }
+
+ // Cancel remaining tasks in the queue - fail them with shutdown exception
+ int remainingTasks = 0; // keep local counter for logging
+ while (!_taskQueue.isEmpty()) {
+ QueuedTask<?> task = _taskQueue.poll();
+ if (task != null) {
+ remainingTasks++;
+ task.fail(new IllegalStateException("Executor is shutting down"));
+ _serverMetrics.addMeteredGlobalValue(
+ ServerMeter.THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS, 1);
+ }
+ }
+
+ super.shutdown();
+
+ LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Remaining
queued tasks: {}", remainingTasks);
+ }
+
+ public int getQueueSize() {
+ return _taskQueue.size();
+ }
+
+ /**
+ * Base class for queued tasks.
+ *
+ * <p>The {@code QueuedTask} class represents a task that can be queued for
execution when the system is under
+ * critical heap usage. It provides a common interface for handling task
execution, timeouts, and failures.</p>
+ *
+ * <p>Lifecycle:
+ * <ul>
+ * <li>Tasks are created and added to the queue when heap usage is
critical.</li>
+ * <li>When heap usage drops below the critical level, tasks are dequeued
and executed.</li>
+ * <li>If a task remains in the queue beyond a configured timeout, the
{@code timeout()} method is invoked.</li>
+ * <li>If an exception occurs during execution, the {@code fail(Exception
e)} method is invoked.</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Thread-safety:
+ * <ul>
+ * <li>Instances of {@code QueuedTask} are not inherently thread-safe and
should be accessed in a thread-safe
+ * manner by the enclosing executor.</li>
+ * <li>The enclosing {@code ThrottleOnCriticalHeapUsageExecutor} ensures
proper synchronization when accessing
+ * and modifying the queue.</li>
+ * </ul>
+ * </p>
+ */
+ private abstract static class QueuedTask<T> {
+ private final long _queueTime;
+ private final long _timeoutMs;
+ private final Object _lock = new Object();
+ private final ThreadAccountant _accountant; // submitting thread's
accountant (may be null)
+ private T _result;
+ private Exception _exception;
+ private boolean _completed = false;
+
+ protected QueuedTask(long timeoutMs) {
+ _queueTime = System.currentTimeMillis();
+ _timeoutMs = timeoutMs;
+ QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
+ _accountant = (ctx != null) ? ctx.getAccountant() : null;
+ }
+
+ public long getQueueTime() {
+ return _queueTime;
+ }
+
+ public long getTimeoutMs() {
+ return _timeoutMs;
+ }
+
+ /** Returns true if the originating context is still throttling
submissions. */
+ public boolean isThrottled() {
+ return _accountant != null && _accountant.throttleQuerySubmission();
+ }
+
+ protected final void completeWithResult(T result) {
+ synchronized (_lock) {
+ _result = result;
+ _completed = true;
+ _lock.notifyAll();
+ }
+ }
+
+ public abstract void execute();
+
+ public void timeout() {
+ synchronized (_lock) {
+ _exception = QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task timed out in queue after " + getTimeoutMs() + "ms due to
high heap usage.");
+ _completed = true;
+ _lock.notifyAll();
+ }
+ }
+
+ public void fail(Exception e) {
+ synchronized (_lock) {
+ _exception = e;
+ _completed = true;
+ _lock.notifyAll();
+ }
+ }
+
+ public T get(long timeout, TimeUnit unit)
+ throws Exception {
+ long timeoutMs = unit.toMillis(timeout);
+ long startTime = System.currentTimeMillis();
+
+ synchronized (_lock) {
+ while (!_completed) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ long remainingTime = timeoutMs - elapsedTime;
+ if (remainingTime <= 0) {
+ break;
+ }
+ try {
+ _lock.wait(remainingTime);
+ } catch (InterruptedException e) {
+ // Preserve interrupt status for calling code
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for task
completion", e);
+ }
+ }
+
+ if (!_completed) {
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task timed out after " + timeoutMs + "ms waiting in queue.");
+ }
+
+ if (_exception != null) {
+ throw _exception;
+ }
+
+ return _result;
+ }
+ }
+ }
+
+ /**
+ * Wrapper for callable tasks that can be queued
+ */
+ private static class QueuedCallableTask<T> extends QueuedTask<T> {
+ private final Callable<T> _task;
+
+ public QueuedCallableTask(Callable<T> task, long timeoutMs) {
+ super(timeoutMs);
+ _task = task;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ T result = _task.call();
+ completeWithResult(result);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ }
+
+ /**
+ * Wrapper for runnable tasks that can be queued
+ */
+ private static class QueuedRunnableTask extends QueuedTask<Void> {
+ private final Runnable _task;
+
+ public QueuedRunnableTask(Runnable task, long timeoutMs) {
+ super(timeoutMs);
+ _task = task;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ _task.run();
+ completeWithResult(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ }
+
+ /** Marker wrapper for tasks dispatched from the internal queue to avoid
re-decoration. */
+ private static final class FromQueueRunnable implements Runnable {
+ private final Runnable _delegate;
+
+ FromQueueRunnable(Runnable delegate) {
+ _delegate = delegate;
+ }
+
+ @Override
+ public void run() {
+ _delegate.run();
+ }
+
+ public Runnable getDelegate() {
+ return _delegate;
+ }
+ }
+
+ /**
+ * Factory to conditionally wrap an executor with queue-based throttling
based on config.
+ * Also handles config parsing and logging as requested in review.
+ */
+ public static ExecutorService maybeWrap(ExecutorService base,
PinotConfiguration serverConf, String logContext) {
+ boolean enabled = serverConf.getProperty(
+
CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE);
+ if (!enabled) {
+ return base;
+ }
+
+ int maxSize =
serverConf.getProperty(CommonConstants.Server.CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE,
+ CommonConstants.Server.DEFAULT_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE);
+ long timeoutMs =
serverConf.getProperty(CommonConstants.Server.CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_TIMEOUT_MS,
+ CommonConstants.Server.DEFAULT_HEAP_USAGE_THROTTLE_QUEUE_TIMEOUT_MS);
+ long monitorIntervalMs =
+
serverConf.getProperty(CommonConstants.Server.CONFIG_OF_HEAP_USAGE_THROTTLE_MONITOR_INTERVAL_MS,
+
CommonConstants.Server.DEFAULT_HEAP_USAGE_THROTTLE_MONITOR_INTERVAL_MS);
+ LOGGER.info(
+ "Enable heap usage throttling for {}. maxSize={}, timeoutMs={},
monitorIntervalMs={} (maxSize<=0 => unbounded)",
+ logContext, maxSize, timeoutMs, monitorIntervalMs);
+ return new ThrottleOnCriticalHeapUsageExecutor(base, maxSize, timeoutMs,
monitorIntervalMs);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index 3edefae0d0a..4f156b52feb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -24,11 +24,11 @@ import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import org.apache.pinot.core.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,20 +86,16 @@ public abstract class ResourceManager {
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
- if
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
-
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
{
- runnerService = new ThrottleOnCriticalHeapUsageExecutor(runnerService);
- }
+ runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ runnerService, config, "query runner");
_queryRunners = MoreExecutors.listeningDecorator(runnerService);
// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
ExecutorService workerService =
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
- if
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
-
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
{
- workerService = new ThrottleOnCriticalHeapUsageExecutor(workerService);
- }
+ workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ workerService, config, "query worker");
_queryWorkers = MoreExecutors.listeningDecorator(workerService);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
new file mode 100644
index 00000000000..e1f3b5c983a
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.executor.ThrottleOnCriticalHeapUsageExecutor;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ThrottleOnCriticalHeapUsageExecutorTest {
+ private ExecutorService _base;
+
+ @BeforeMethod
+ public void setUp() {
+ _base = Executors.newFixedThreadPool(8);
+ ServerMetrics.deregister();
+ ServerMetrics.register(new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()));
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (_base != null) {
+ _base.shutdownNow();
+ }
+ ServerMetrics.deregister();
+ }
+
+ @Test
+ public void testMultiThreadThrottlingQueueAndRecovery()
+ throws Exception {
+ // Custom accountant toggling throttle
+ AtomicBoolean throttle = new AtomicBoolean(true);
+ ThreadAccountant accountant = Mockito.mock(ThreadAccountant.class);
+ Mockito.when(accountant.throttleQuerySubmission()).thenAnswer(inv ->
throttle.get());
+
+ // Open a query context for submitting tasks
+ try (QueryThreadContext ignored =
QueryThreadContext.open(QueryExecutionContext.forMseTest(), accountant)) {
+ ThrottleOnCriticalHeapUsageExecutor ex = new
ThrottleOnCriticalHeapUsageExecutor(
+ _base, /*maxQueueSize*/ 100, /*timeout*/
TimeUnit.SECONDS.toMillis(5), /*monitor*/ 50);
+
+ int numTasks = 50;
+ AtomicInteger ranCount = new AtomicInteger(0);
+
+ // Submit multiple tasks concurrently under throttling; they should queue
+ for (int i = 0; i < numTasks; i++) {
+ ex.execute(() -> {
+ ranCount.incrementAndGet();
+ });
+ }
+
+ // Ensure tasks are queued (not run yet)
+ Thread.sleep(100);
+ assertEquals(ranCount.get(), 0);
+ assertEquals(ex.getQueueSize(), numTasks);
+
+ // Release throttling and wait for background monitor to drain
+ throttle.set(false);
+
+ // Wait up to a few seconds for all tasks to run
+ long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(5);
+ while (System.currentTimeMillis() < deadline && ranCount.get() <
numTasks) {
+ Thread.sleep(20);
+ }
+
+ assertEquals(ranCount.get(), numTasks, "All queued tasks should run
after recovery");
+
+ // Metrics should reflect queued and processed counts
+ // Note: Using getMeteredValue increments is intrusive; validate no
exceptions and queue drained
+ assertEquals(ex.getQueueSize(), 0);
+
+ ex.shutdown();
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index cbb5086f8ac..5626cd4d3a0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -43,6 +43,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -78,7 +79,6 @@ import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.executor.HardLimitExecutor;
import org.apache.pinot.spi.executor.MetricsExecutor;
-import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
@@ -213,16 +213,12 @@ public class QueryRunner {
Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
exceedStrategy =
QueryThreadExceedStrategy.valueOf(Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
}
-
LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy:
{}", hardLimit, exceedStrategy);
_executorService = new HardLimitExecutor(hardLimit, _executorService,
exceedStrategy);
}
- if
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
- Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
- LOGGER.info("Enable OOM Throttling on critical heap usage for
multi-stage executor");
- _executorService = new
ThrottleOnCriticalHeapUsageExecutor(_executorService);
- }
+ _executorService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _executorService, serverConf, "multi-stage executor");
_opChainScheduler = new
OpChainSchedulerService(instanceDataManager.getInstanceId(), _executorService,
serverConf);
_mailboxService = new MailboxService(hostname, port, InstanceType.SERVER,
serverConf, tlsConfig);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/DecoratorExecutorService.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/DecoratorExecutorService.java
index a92b86b6101..5da04765f01 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/DecoratorExecutorService.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/DecoratorExecutorService.java
@@ -46,7 +46,7 @@ import javax.annotation.Nullable;
* TODO: Convert this class and its usages into an Executor instead of an
ExecutorService
*/
public abstract class DecoratorExecutorService implements ExecutorService {
- private final ExecutorService _executorService;
+ protected final ExecutorService _executorService;
private final Consumer<Future<?>> _onSubmit;
public DecoratorExecutorService(ExecutorService executorService) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
deleted file mode 100644
index b251aaa4e88..00000000000
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.executor;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import org.apache.pinot.spi.accounting.ThreadAccountant;
-import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.query.QueryThreadContext;
-
-
-/**
- * An Executor that throttles task submission when the heap usage is critical.
- * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
- */
-public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
-
- public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService) {
- super(executorService);
- }
-
- protected void checkTaskAllowed() {
- if (QueryThreadContext.get().getAccountant().throttleQuerySubmission()) {
- throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks
throttled due to high heap usage.");
- }
- }
-
- @Override
- protected <T> Callable<T> decorate(Callable<T> task) {
- checkTaskAllowed();
- return () -> {
- checkTaskAllowed();
- return task.call();
- };
- }
-
- @Override
- protected Runnable decorate(Runnable task) {
- checkTaskAllowed();
- return () -> {
- checkTaskAllowed();
- task.run();
- };
- }
-}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8d6e4012ee8..ab3ec338840 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1119,6 +1119,17 @@ public class CommonConstants {
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
public static final int DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS = -1;
// Use number of CPU cores
+ // OOM protection: heap usage throttle configuration
+ public static final String CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE =
+ QUERY_EXECUTOR_CONFIG_PREFIX + ".heap.usage.throttle.queue.maxSize";
+ public static final int DEFAULT_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE = 1000;
+ public static final String CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_TIMEOUT_MS =
+ QUERY_EXECUTOR_CONFIG_PREFIX + ".heap.usage.throttle.queue.timeoutMs";
+ public static final long DEFAULT_HEAP_USAGE_THROTTLE_QUEUE_TIMEOUT_MS =
30_000L;
+ public static final String
CONFIG_OF_HEAP_USAGE_THROTTLE_MONITOR_INTERVAL_MS =
+ QUERY_EXECUTOR_CONFIG_PREFIX +
".heap.usage.throttle.monitorIntervalMs";
+ public static final long DEFAULT_HEAP_USAGE_THROTTLE_MONITOR_INTERVAL_MS =
1_000L;
+
// Group-by query related configs
public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]