Jackie-Jiang commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2305002281
##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,563 @@
*/
package org.apache.pinot.spi.executor;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
* Heap Usage level is obtained from {@link
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
*/
public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
- ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ private static final int DEFAULT_QUEUE_SIZE = 1000;
+ private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+ private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private final BlockingQueue<QueuedTask> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _shutdownCanceledTaskCount = new
AtomicInteger(0);
+ private volatile long _lastQueueStatusLogTime = 0;
public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+ this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+ DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+ }
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ ThreadResourceUsageAccountant threadResourceUsageAccountant,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
super(executorService);
_threadResourceUsageAccountant = threadResourceUsageAccountant;
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs,
monitorIntervalMs);
}
protected void checkTaskAllowed() {
Review Comment:
I don't think we need to maintain this backward compatibility given this
executor is not public facing
##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,563 @@
*/
package org.apache.pinot.spi.executor;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
* Heap Usage level is obtained from {@link
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
*/
public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
- ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ private static final int DEFAULT_QUEUE_SIZE = 1000;
+ private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+ private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+ private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue
status every 30 seconds
+
+ private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private final BlockingQueue<QueuedTask> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _defaultQueueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _shutdownCanceledTaskCount = new
AtomicInteger(0);
+ private volatile long _lastQueueStatusLogTime = 0;
public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+ this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+ DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+ }
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ ThreadResourceUsageAccountant threadResourceUsageAccountant,
+ int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
super(executorService);
_threadResourceUsageAccountant = threadResourceUsageAccountant;
+ _maxQueueSize = maxQueueSize;
+ _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+ _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
default timeout: {}ms, "
+ + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs,
monitorIntervalMs);
}
protected void checkTaskAllowed() {
- if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
- throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks
throttled due to high heap usage.");
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ return _threadResourceUsageAccountant.throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
+ try {
+ int initialQueueSize = _taskQueue.size();
+ long currentTime = System.currentTimeMillis();
+
+ // Log queue size for monitoring if there are queued tasks (throttled to
prevent log flooding)
+ if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) >
LOG_THROTTLE_INTERVAL_MS) {
+ LOGGER.info("Processing queued tasks. Current queue size: {}, Queued:
{}, Processed: {}, "
+ + "Timed out: {}, Shutdown canceled: {}",
+ initialQueueSize, _queuedTaskCount.get(),
_processedTaskCount.get(),
+ _timedOutTaskCount.get(), _shutdownCanceledTaskCount.get());
+ _lastQueueStatusLogTime = currentTime;
+ }
+
+ // Process tasks while heap usage is not critical and queue is not empty
+ while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+ QueuedTask queuedTask = _taskQueue.poll();
+ if (queuedTask != null) {
+ long queueTime = System.currentTimeMillis() -
queuedTask.getQueueTime();
+
+ if (queueTime > queuedTask.getTimeoutMs()) {
+ // Task has timed out in queue
+ queuedTask.timeout();
+ _timedOutTaskCount.incrementAndGet();
+ LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)",
queueTime, queuedTask.getTimeoutMs());
+ } else {
+ // Submit the task for execution
+ try {
+ queuedTask.execute();
+ _processedTaskCount.incrementAndGet();
+ LOGGER.debug("Processed queued task after {}ms in queue",
queueTime);
+ } catch (Exception e) {
+ LOGGER.error("Error executing queued task", e);
+ queuedTask.fail(e);
+ }
+ }
+ }
+ }
+
+ // Log completion only for significant processing (5+ tasks) to avoid
log spam
+ int finalQueueSize = _taskQueue.size();
+ if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+ int processedThisCycle = initialQueueSize - finalQueueSize;
+ if (processedThisCycle >= 5) {
+ LOGGER.info("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ } else {
+ LOGGER.debug("Completed processing cycle. Processed {} tasks,
remaining queue size: {}",
+ processedThisCycle, finalQueueSize);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in processQueuedTasks", e);
}
}
@Override
protected <T> Callable<T> decorate(Callable<T> task) {
- checkTaskAllowed();
return () -> {
- checkTaskAllowed();
- return task.call();
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ return queueCallableTask(task);
+ } else {
+ // Execute immediately if heap usage is normal
+ return task.call();
+ }
};
}
@Override
protected Runnable decorate(Runnable task) {
- checkTaskAllowed();
return () -> {
- checkTaskAllowed();
- task.run();
+ if (shouldQueueTask()) {
+ // Queue the task if heap usage is critical
+ queueRunnableTask(task);
+ } else {
+ // Execute immediately if heap usage is normal
+ task.run();
+ }
};
}
+
+ /**
+ * Execute a callable task with custom timeout if queuing is needed.
+ * This allows per-task timeout specification instead of using the global
default.
+ *
+ * @param task the callable task to execute
+ * @param timeoutMs the timeout in milliseconds for this specific task if it
gets queued
+ * @return the result of the callable task
+ * @throws Exception if execution fails
+ */
+ public <T> T executeWithTimeout(Callable<T> task, long timeoutMs)
Review Comment:
This seems not used, meaning the timeout is never honored. We cannot use the
same timeout across all queries
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]