Jackie-Jiang commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2427604323
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java:
##########
@@ -88,7 +88,15 @@ public ResourceManager(PinotConfiguration config) {
ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
if
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
{
- runnerService = new ThrottleOnCriticalHeapUsageExecutor(runnerService);
+ int maxSize =
config.getProperty(CommonConstants.Server.CONFIG_OF_OOM_THROTTLE_QUEUE_MAX_SIZE,
Review Comment:
We can move the config parsing logic into
`ThrottleOnCriticalHeapUsageExecutor`
##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private volatile long _lastQueueStatusLogTime = 0;
+ private final PinotMeter _queuedTasksMeter;
Review Comment:
We don't usually update meter this way. Use `addMeteredGlobalValue()` instead
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -210,6 +210,16 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// reingestion metrics
SEGMENT_REINGESTION_FAILURE("segments", false),
+ // ThrottleOnCriticalHeapUsageExecutor meters
+ THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL("count", true,
Review Comment:
Given it is a meter, we don't need to put `_TOTAL` as a suffix
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -212,15 +212,25 @@ public void init(PinotConfiguration serverConf,
InstanceDataManager instanceData
Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
exceedStrategy =
QueryThreadExceedStrategy.valueOf(Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
}
-
LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy:
{}", hardLimit, exceedStrategy);
_executorService = new HardLimitExecutor(hardLimit, _executorService,
exceedStrategy);
}
if
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
- LOGGER.info("Enable OOM Throttling on critical heap usage for
multi-stage executor");
- _executorService = new
ThrottleOnCriticalHeapUsageExecutor(_executorService);
+ int maxSize =
serverConf.getProperty(Server.CONFIG_OF_OOM_THROTTLE_QUEUE_MAX_SIZE,
Review Comment:
Move the config parsing and logging logic into
`ThrottleOnCriticalHeapUsageExecutor`
##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private volatile long _lastQueueStatusLogTime = 0;
+ private final PinotMeter _queuedTasksMeter;
+ private final PinotMeter _processedTasksMeter;
+ private final PinotMeter _timedOutTasksMeter;
+ private final PinotMeter _shutdownCanceledTasksMeter;
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+ super(executorService);
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded
queue
+ _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new
LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ // Register meters for queue activity tracking
+ ServerMetrics metrics = ServerMetrics.get();
+ _queuedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL);
+ _processedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_PROCESSED_TASKS_TOTAL);
+ _timedOutTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_TIMED_OUT_TASKS_TOTAL);
+ _shutdownCanceledTasksMeter = metrics.getMeteredValue(
+ ServerMeter.THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS_TOTAL);
+
+ // Register a global gauge for current queue size
+ metrics.setOrUpdateGlobalGauge(ServerGauge.THROTTLE_EXECUTOR_QUEUE_SIZE,
() -> (long) _taskQueue.size());
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms (maxQueueSize<=0 => unbounded)",
maxQueueSize, defaultQueueTimeoutMs,
+ monitorIntervalMs);
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ return QueryThreadContext.get().getAccountant().throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
+ try {
+ int initialQueueSize = _taskQueue.size();
+ long currentTime = System.currentTimeMillis();
+
+ // Log queue size for monitoring if there are queued tasks (throttled to
prevent log flooding)
+ if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) >
LOG_THROTTLE_INTERVAL_MS) {
+ LOGGER.info("Processing queued tasks. Current queue size: {}, Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}",
+ initialQueueSize, _queuedTasksMeter.count(),
_processedTasksMeter.count(),
+ _timedOutTasksMeter.count(), _shutdownCanceledTasksMeter.count());
+ _lastQueueStatusLogTime = currentTime;
+ // Metrics are exported via ServerMetrics global gauges registered by
callers
+ }
+
+ // Process tasks while heap usage is not critical and queue is not empty
+ while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+ QueuedTask<?> queuedTask = _taskQueue.poll();
+ if (queuedTask != null) {
+ long queueTime = System.currentTimeMillis() -
queuedTask.getQueueTime();
+
+ if (queueTime > queuedTask.getTimeoutMs()) {
+ // Task has timed out in queue
+ queuedTask.timeout();
+ _timedOutTasksMeter.mark();
+ LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)",
queueTime, queuedTask.getTimeoutMs());
+ } else {
+ // Submit the task for execution on the underlying executor (via
this decorator)
+ execute(() -> {
+ try {
+ queuedTask.execute();
+ _processedTasksMeter.mark();
+ LOGGER.debug("Processed queued task after {}ms in queue",
queueTime);
+ } catch (Exception e) {
+ LOGGER.error("Error executing queued task", e);
+ queuedTask.fail(e);
+ }
+ });
+ }
+ }
+ }
+
+ // Log completion only for significant processing (5+ tasks) to avoid
log spam
+ int finalQueueSize = _taskQueue.size();
+ if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+ int processedThisCycle = initialQueueSize - finalQueueSize;
+ if (processedThisCycle >= 5) {
+ LOGGER.info("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ } else {
+ LOGGER.debug("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in processQueuedTasks", e);
+ }
+ }
+
+ @Override
+ protected <T> Callable<T> decorate(Callable<T> task) {
+ return () -> {
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ return queueCallableTask(task, timeoutMs);
+ } else {
+ // Execute immediately if heap usage is normal
+ return task.call();
+ }
+ };
+ }
+
+ @Override
+ protected Runnable decorate(Runnable task) {
+ return () -> {
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ queueRunnableTask(task, timeoutMs);
+ } else {
+ // Execute immediately if heap usage is normal
+ task.run();
+ }
+ };
+ }
+
+ private long computePerTaskTimeoutMs(long configuredTimeoutMs) {
+ try {
+ if (QueryThreadContext.getIfAvailable() != null &&
QueryThreadContext.get().getExecutionContext() != null) {
+ long remaining =
QueryThreadContext.get().getExecutionContext().getActiveDeadlineMs()
+ - System.currentTimeMillis();
+ if (remaining <= 0) {
+ return 0L;
+ }
+ // Do not wait in the queue longer than the query's remaining active
deadline
+ return Math.min(configuredTimeoutMs, remaining);
+ }
+ } catch (Throwable t) {
+ // Be conservative and use configured timeout if context is not available
+ LOGGER.debug("Failed to compute per-task timeout from thread context;
using configured timeout.", t);
+ }
+ return configuredTimeoutMs;
+ }
+
+ /**
+ * Queue a callable task and wait for its execution
+ */
+ private <T> T queueCallableTask(Callable<T> task, long timeoutMs)
+ throws Exception {
+ QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task,
timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+ _queuedTasksMeter.mark();
+ LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+ // Wait for the task to complete or timeout
+ return queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Queue a runnable task and wait for its execution
+ */
+ private void queueRunnableTask(Runnable task, long timeoutMs) {
+ QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+ _queuedTasksMeter.mark();
+ LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+ try {
+ // Wait for the task to complete or timeout
+ queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ } else {
+ throw new RuntimeException("Error executing queued task", e);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ _isShutdown.set(true);
+ _monitorExecutor.shutdownNow();
+
+ // Allow the monitor thread to complete current processing
+ try {
+ if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn("Monitor executor did not terminate within the timeout
period.");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for monitor executor to
terminate.", e);
+ Thread.currentThread().interrupt();
+ }
+
+ // Cancel remaining tasks in the queue - fail them with shutdown exception
+ int remainingTasks = 0; // keep local counter for logging
+ while (!_taskQueue.isEmpty()) {
+ QueuedTask<?> task = _taskQueue.poll();
+ if (task != null) {
+ remainingTasks++;
+ task.fail(new IllegalStateException("Executor is shutting down"));
+ _shutdownCanceledTasksMeter.mark();
+ }
+ }
+
+ super.shutdown();
+
+ LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Stats - Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}, Remaining: {}",
+ _queuedTasksMeter.count(), _processedTasksMeter.count(),
_timedOutTasksMeter.count(),
+ _shutdownCanceledTasksMeter.count(), remainingTasks);
+ }
+
+ public int getQueueSize() {
+ return _taskQueue.size();
+ }
+
+ /**
+ * Base class for queued tasks.
+ *
+ * <p>The {@code QueuedTask} class represents a task that can be queued for
execution when the system is under
+ * critical heap usage. It provides a common interface for handling task
execution, timeouts, and failures.</p>
+ *
+ * <p>Lifecycle:
+ * <ul>
+ * <li>Tasks are created and added to the queue when heap usage is
critical.</li>
+ * <li>When heap usage drops below the critical level, tasks are dequeued
and executed.</li>
+ * <li>If a task remains in the queue beyond a configured timeout, the
{@code timeout()} method is invoked.</li>
+ * <li>If an exception occurs during execution, the {@code fail(Exception
e)} method is invoked.</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Thread-safety:
+ * <ul>
+ * <li>Instances of {@code QueuedTask} are not inherently thread-safe and
should be accessed in a thread-safe
+ * manner by the enclosing executor.</li>
+ * <li>The enclosing {@code ThrottleOnCriticalHeapUsageExecutor} ensures
proper synchronization when accessing
+ * and modifying the queue.</li>
+ * </ul>
+ * </p>
+ */
+ private abstract static class QueuedTask<T> {
+ private final long _queueTime;
+ private final long _timeoutMs;
+ private volatile T _result;
+ private volatile Exception _exception;
+ private volatile boolean _completed = false;
Review Comment:
These doesn't need to be `volatile` given they are all protected with the
lock. (minor) Move them after the `final` variables for readability
##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private volatile long _lastQueueStatusLogTime = 0;
Review Comment:
This is only accessed in `processQueuedTasks()`, so probably don't need to
be volatile. (minor) Move it to follow the `final` variables for readability
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1119,6 +1119,17 @@ public static class Server {
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
public static final int DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS = -1;
// Use number of CPU cores
+ // OOM throttle configuration
+ public static final String CONFIG_OF_OOM_THROTTLE_QUEUE_MAX_SIZE =
+ QUERY_EXECUTOR_CONFIG_PREFIX + ".oomThrottle.queue.maxSize";
Review Comment:
(minor) For consistency, only use camel case for the last part. Consider
`heap.usage.throttle.XXX`
##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private volatile long _lastQueueStatusLogTime = 0;
+ private final PinotMeter _queuedTasksMeter;
+ private final PinotMeter _processedTasksMeter;
+ private final PinotMeter _timedOutTasksMeter;
+ private final PinotMeter _shutdownCanceledTasksMeter;
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+ super(executorService);
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded
queue
+ _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new
LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ // Register meters for queue activity tracking
+ ServerMetrics metrics = ServerMetrics.get();
+ _queuedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL);
+ _processedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_PROCESSED_TASKS_TOTAL);
+ _timedOutTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_TIMED_OUT_TASKS_TOTAL);
+ _shutdownCanceledTasksMeter = metrics.getMeteredValue(
+ ServerMeter.THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS_TOTAL);
+
+ // Register a global gauge for current queue size
+ metrics.setOrUpdateGlobalGauge(ServerGauge.THROTTLE_EXECUTOR_QUEUE_SIZE,
() -> (long) _taskQueue.size());
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms (maxQueueSize<=0 => unbounded)",
maxQueueSize, defaultQueueTimeoutMs,
+ monitorIntervalMs);
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ return QueryThreadContext.get().getAccountant().throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
+ try {
+ int initialQueueSize = _taskQueue.size();
+ long currentTime = System.currentTimeMillis();
+
+ // Log queue size for monitoring if there are queued tasks (throttled to
prevent log flooding)
+ if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) >
LOG_THROTTLE_INTERVAL_MS) {
+ LOGGER.info("Processing queued tasks. Current queue size: {}, Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}",
+ initialQueueSize, _queuedTasksMeter.count(),
_processedTasksMeter.count(),
+ _timedOutTasksMeter.count(), _shutdownCanceledTasksMeter.count());
+ _lastQueueStatusLogTime = currentTime;
+ // Metrics are exported via ServerMetrics global gauges registered by
callers
+ }
+
+ // Process tasks while heap usage is not critical and queue is not empty
+ while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+ QueuedTask<?> queuedTask = _taskQueue.poll();
+ if (queuedTask != null) {
+ long queueTime = System.currentTimeMillis() -
queuedTask.getQueueTime();
+
+ if (queueTime > queuedTask.getTimeoutMs()) {
+ // Task has timed out in queue
+ queuedTask.timeout();
+ _timedOutTasksMeter.mark();
+ LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)",
queueTime, queuedTask.getTimeoutMs());
+ } else {
+ // Submit the task for execution on the underlying executor (via
this decorator)
+ execute(() -> {
+ try {
+ queuedTask.execute();
+ _processedTasksMeter.mark();
+ LOGGER.debug("Processed queued task after {}ms in queue",
queueTime);
+ } catch (Exception e) {
+ LOGGER.error("Error executing queued task", e);
+ queuedTask.fail(e);
+ }
+ });
+ }
+ }
+ }
+
+ // Log completion only for significant processing (5+ tasks) to avoid
log spam
+ int finalQueueSize = _taskQueue.size();
+ if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+ int processedThisCycle = initialQueueSize - finalQueueSize;
+ if (processedThisCycle >= 5) {
+ LOGGER.info("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ } else {
+ LOGGER.debug("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in processQueuedTasks", e);
+ }
+ }
+
+ @Override
+ protected <T> Callable<T> decorate(Callable<T> task) {
+ return () -> {
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ return queueCallableTask(task, timeoutMs);
+ } else {
+ // Execute immediately if heap usage is normal
+ return task.call();
+ }
+ };
+ }
+
+ @Override
+ protected Runnable decorate(Runnable task) {
+ return () -> {
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ long timeoutMs = computePerTaskTimeoutMs(_defaultQueueTimeoutMs);
+ queueRunnableTask(task, timeoutMs);
+ } else {
+ // Execute immediately if heap usage is normal
+ task.run();
+ }
+ };
+ }
+
+ private long computePerTaskTimeoutMs(long configuredTimeoutMs) {
+ try {
+ if (QueryThreadContext.getIfAvailable() != null &&
QueryThreadContext.get().getExecutionContext() != null) {
+ long remaining =
QueryThreadContext.get().getExecutionContext().getActiveDeadlineMs()
+ - System.currentTimeMillis();
+ if (remaining <= 0) {
+ return 0L;
+ }
+ // Do not wait in the queue longer than the query's remaining active
deadline
+ return Math.min(configuredTimeoutMs, remaining);
+ }
+ } catch (Throwable t) {
+ // Be conservative and use configured timeout if context is not available
+ LOGGER.debug("Failed to compute per-task timeout from thread context;
using configured timeout.", t);
+ }
+ return configuredTimeoutMs;
+ }
+
+ /**
+ * Queue a callable task and wait for its execution
+ */
+ private <T> T queueCallableTask(Callable<T> task, long timeoutMs)
+ throws Exception {
+ QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task,
timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+ _queuedTasksMeter.mark();
+ LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+ // Wait for the task to complete or timeout
+ return queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Queue a runnable task and wait for its execution
+ */
+ private void queueRunnableTask(Runnable task, long timeoutMs) {
+ QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+ if (_maxQueueSize > 0 && !_taskQueue.offer(queuedTask)) {
+ // Queue is full
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "Task queue is full (size: " + _maxQueueSize + ") due to high heap
usage.");
+ }
+
+ _queuedTasksMeter.mark();
+ LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+ try {
+ // Wait for the task to complete or timeout
+ queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ } else {
+ throw new RuntimeException("Error executing queued task", e);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ _isShutdown.set(true);
+ _monitorExecutor.shutdownNow();
+
+ // Allow the monitor thread to complete current processing
+ try {
+ if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn("Monitor executor did not terminate within the timeout
period.");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for monitor executor to
terminate.", e);
+ Thread.currentThread().interrupt();
+ }
+
+ // Cancel remaining tasks in the queue - fail them with shutdown exception
+ int remainingTasks = 0; // keep local counter for logging
+ while (!_taskQueue.isEmpty()) {
+ QueuedTask<?> task = _taskQueue.poll();
+ if (task != null) {
+ remainingTasks++;
+ task.fail(new IllegalStateException("Executor is shutting down"));
+ _shutdownCanceledTasksMeter.mark();
+ }
+ }
+
+ super.shutdown();
+
+ LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Stats - Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}, Remaining: {}",
+ _queuedTasksMeter.count(), _processedTasksMeter.count(),
_timedOutTasksMeter.count(),
+ _shutdownCanceledTasksMeter.count(), remainingTasks);
+ }
+
+ public int getQueueSize() {
+ return _taskQueue.size();
+ }
+
+ /**
+ * Base class for queued tasks.
+ *
+ * <p>The {@code QueuedTask} class represents a task that can be queued for
execution when the system is under
+ * critical heap usage. It provides a common interface for handling task
execution, timeouts, and failures.</p>
+ *
+ * <p>Lifecycle:
+ * <ul>
+ * <li>Tasks are created and added to the queue when heap usage is
critical.</li>
+ * <li>When heap usage drops below the critical level, tasks are dequeued
and executed.</li>
+ * <li>If a task remains in the queue beyond a configured timeout, the
{@code timeout()} method is invoked.</li>
+ * <li>If an exception occurs during execution, the {@code fail(Exception
e)} method is invoked.</li>
+ * </ul>
+ * </p>
+ *
+ * <p>Thread-safety:
+ * <ul>
+ * <li>Instances of {@code QueuedTask} are not inherently thread-safe and
should be accessed in a thread-safe
+ * manner by the enclosing executor.</li>
+ * <li>The enclosing {@code ThrottleOnCriticalHeapUsageExecutor} ensures
proper synchronization when accessing
+ * and modifying the queue.</li>
+ * </ul>
+ * </p>
+ */
+ private abstract static class QueuedTask<T> {
+ private final long _queueTime;
+ private final long _timeoutMs;
+ private volatile T _result;
+ private volatile Exception _exception;
+ private volatile boolean _completed = false;
+ private final Object _lock = new Object();
+
+ protected QueuedTask(long timeoutMs) {
+ _queueTime = System.currentTimeMillis();
+ _timeoutMs = timeoutMs;
+ }
+
+ public long getQueueTime() {
+ return _queueTime;
+ }
+
+ public long getTimeoutMs() {
+ return _timeoutMs;
+ }
+
+ protected final void completeWithResult(T result) {
+ synchronized (_lock) {
+ _result = result;
+ _completed = true;
+ _lock.notifyAll();
+ }
+ }
+
+ public abstract void execute()
+ throws Exception;
Review Comment:
It won't throw any exception since exception is put into a separate field
##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
+ * Heap Usage level is obtained from {@link
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ // Defaults are kept near config in CommonConstants; no local defaults
needed here
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final BlockingQueue<QueuedTask<?>> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private volatile long _lastQueueStatusLogTime = 0;
+ private final PinotMeter _queuedTasksMeter;
+ private final PinotMeter _processedTasksMeter;
+ private final PinotMeter _timedOutTasksMeter;
+ private final PinotMeter _shutdownCanceledTasksMeter;
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+ super(executorService);
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded
queue
+ _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new
LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ // Register meters for queue activity tracking
+ ServerMetrics metrics = ServerMetrics.get();
+ _queuedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL);
+ _processedTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_PROCESSED_TASKS_TOTAL);
+ _timedOutTasksMeter =
metrics.getMeteredValue(ServerMeter.THROTTLE_EXECUTOR_TIMED_OUT_TASKS_TOTAL);
+ _shutdownCanceledTasksMeter = metrics.getMeteredValue(
+ ServerMeter.THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS_TOTAL);
+
+ // Register a global gauge for current queue size
+ metrics.setOrUpdateGlobalGauge(ServerGauge.THROTTLE_EXECUTOR_QUEUE_SIZE,
() -> (long) _taskQueue.size());
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms (maxQueueSize<=0 => unbounded)",
maxQueueSize, defaultQueueTimeoutMs,
+ monitorIntervalMs);
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ return QueryThreadContext.get().getAccountant().throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
+ try {
+ int initialQueueSize = _taskQueue.size();
+ long currentTime = System.currentTimeMillis();
+
+ // Log queue size for monitoring if there are queued tasks (throttled to
prevent log flooding)
+ if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) >
LOG_THROTTLE_INTERVAL_MS) {
+ LOGGER.info("Processing queued tasks. Current queue size: {}, Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}",
+ initialQueueSize, _queuedTasksMeter.count(),
_processedTasksMeter.count(),
+ _timedOutTasksMeter.count(), _shutdownCanceledTasksMeter.count());
+ _lastQueueStatusLogTime = currentTime;
+ // Metrics are exported via ServerMetrics global gauges registered by
callers
+ }
+
+ // Process tasks while heap usage is not critical and queue is not empty
+ while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+ QueuedTask<?> queuedTask = _taskQueue.poll();
+ if (queuedTask != null) {
+ long queueTime = System.currentTimeMillis() -
queuedTask.getQueueTime();
+
+ if (queueTime > queuedTask.getTimeoutMs()) {
+ // Task has timed out in queue
+ queuedTask.timeout();
+ _timedOutTasksMeter.mark();
+ LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)",
queueTime, queuedTask.getTimeoutMs());
+ } else {
+ // Submit the task for execution on the underlying executor (via
this decorator)
+ execute(() -> {
Review Comment:
(MAJOR) Should we use `_executorService.submit()` instead? We shouldn't
decorate it again. We should add a test for it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]