This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 36ce140f18 fix race condition in `ScalingThreadPoolExecutor` (#13360) 36ce140f18 is described below commit 36ce140f18429f5f0505704b9a25e022d1042c9e Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Tue Jun 11 16:44:41 2024 -0700 fix race condition in `ScalingThreadPoolExecutor` (#13360) --- .../common/utils/ScalingThreadPoolExecutor.java | 45 +++++++++++++++------- .../utils/ScalingThreadPoolExecutorTest.java | 15 ++++++++ 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java index 989658a692..ea4f00a8e8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java @@ -26,19 +26,20 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * ScalingThreadPoolExecutor is an auto-scaling ThreadPoolExecutor. If there is no available thread for a new task, * a new thread will be created by the internal ThreadPoolExecutor to process the task (up to maximumPoolSize). If * there is an available thread, no additional thread will be created. - * + * <p> * This is done by creating a ScalingQueue that will 'reject' a new task if there are no available threads, forcing * the pool to create a new thread. The rejection is then handled to queue the task anyway. - * + * <p> * This differs from the plain ThreadPoolExecutor implementation which does not create new threads if the queue (not - * thread pool) has capacity. For a more complete explanation, see: + * thread pool) has capacity. For a more complete explanation, see (note: the original version includes a race + * condition, and the implementation here differs slightly): * https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile */ public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { @@ -76,7 +77,6 @@ public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { ScalingQueue<Runnable> queue = new ScalingQueue<>(); ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue); executor.setRejectedExecutionHandler(new ForceQueuePolicy()); - queue.setThreadPoolExecutor(executor); return executor; } @@ -100,29 +100,46 @@ public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { */ static class ScalingQueue<E> extends LinkedBlockingQueue<E> { - private ThreadPoolExecutor _executor; + AtomicInteger _currentIdleThreadCount = new AtomicInteger(0); // Creates a queue of size Integer.MAX_SIZE public ScalingQueue() { super(); } - // Sets the executor this queue belongs to - public void setThreadPoolExecutor(ThreadPoolExecutor executor) { - _executor = executor; + @Override + public E take() + throws InterruptedException { + _currentIdleThreadCount.incrementAndGet(); + try { + return super.take(); + } finally { + _currentIdleThreadCount.decrementAndGet(); + } + } + + @Override + @Nullable + public E poll(long timeout, TimeUnit unit) + throws InterruptedException { + _currentIdleThreadCount.incrementAndGet(); + try { + return super.poll(timeout, unit); + } finally { + _currentIdleThreadCount.decrementAndGet(); + } } /** - * Inserts the specified element at the tail of this queue if there is at least one available thread - * to run the current task. If all pool threads are actively busy, it rejects the offer. + * Inserts the specified element at the tail of this queue if there is at least one idle thread + * to run the current task. If all pool threads are actively busy, the offer is rejected. * * @param e the element to add. * @return true if it was possible to add the element to this queue, else false */ @Override - public boolean offer(@Nonnull E e) { - int allWorkingThreads = _executor.getActiveCount() + super.size(); - return allWorkingThreads < _executor.getPoolSize() && super.offer(e); + public boolean offer(E e) { + return _currentIdleThreadCount.get() > 0 && super.offer(e); } } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java index 5a1203f1d2..7f0e892a46 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java @@ -57,6 +57,21 @@ public class ScalingThreadPoolExecutorTest { "Timed out waiting for thread pool to scale down"); } + @Test + public void testRapidSubmission() { + ThreadPoolExecutor executorService = (ThreadPoolExecutor) ScalingThreadPoolExecutor.newScalingThreadPool(0, 4, 0L); + Runnable r1 = getSleepingRunnable(); + Runnable r2 = getSleepingRunnable(); + + // When Runnables are submitted rapidly, the pool should scale up to 2 threads. The previous test cases can fail + // to catch such a race condition because Runnables are initialized as they are submitted, which introduced enough + // delay to avoid the condition + executorService.submit(r1); + executorService.submit(r2); + TestUtils.waitForCondition(aVoid -> executorService.getPoolSize() == 2, 2000, + "Timed out waiting for thread pool to scale up"); + } + private Runnable getSleepingRunnable() { return () -> { try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org