This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ce140f18 fix race condition in `ScalingThreadPoolExecutor` (#13360)
36ce140f18 is described below

commit 36ce140f18429f5f0505704b9a25e022d1042c9e
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Tue Jun 11 16:44:41 2024 -0700

    fix race condition in `ScalingThreadPoolExecutor` (#13360)
---
 .../common/utils/ScalingThreadPoolExecutor.java    | 45 +++++++++++++++-------
 .../utils/ScalingThreadPoolExecutorTest.java       | 15 ++++++++
 2 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
index 989658a692..ea4f00a8e8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
@@ -26,19 +26,20 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 
 /**
  * ScalingThreadPoolExecutor is an auto-scaling ThreadPoolExecutor. If there 
is no available thread for a new task,
  * a new thread will be created by the internal ThreadPoolExecutor to process 
the task (up to maximumPoolSize). If
  * there is an available thread, no additional thread will be created.
- *
+ * <p>
  * This is done by creating a ScalingQueue that will 'reject' a new task if 
there are no available threads, forcing
  * the pool to create a new thread. The rejection is then handled to queue the 
task anyway.
- *
+ * <p>
  * This differs from the plain ThreadPoolExecutor implementation which does 
not create new threads if the queue (not
- * thread pool) has capacity. For a more complete explanation, see:
+ * thread pool) has capacity. For a more complete explanation, see (note: the 
original version includes a race
+ * condition, and the implementation here differs slightly):
  * 
https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
  */
 public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
@@ -76,7 +77,6 @@ public class ScalingThreadPoolExecutor extends 
ThreadPoolExecutor {
     ScalingQueue<Runnable> queue = new ScalingQueue<>();
     ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, 
keepAliveTime, TimeUnit.MILLISECONDS, queue);
     executor.setRejectedExecutionHandler(new ForceQueuePolicy());
-    queue.setThreadPoolExecutor(executor);
     return executor;
   }
 
@@ -100,29 +100,46 @@ public class ScalingThreadPoolExecutor extends 
ThreadPoolExecutor {
    */
   static class ScalingQueue<E> extends LinkedBlockingQueue<E> {
 
-    private ThreadPoolExecutor _executor;
+    AtomicInteger _currentIdleThreadCount = new AtomicInteger(0);
 
     // Creates a queue of size Integer.MAX_SIZE
     public ScalingQueue() {
       super();
     }
 
-    // Sets the executor this queue belongs to
-    public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
-      _executor = executor;
+    @Override
+    public E take()
+        throws InterruptedException {
+      _currentIdleThreadCount.incrementAndGet();
+      try {
+        return super.take();
+      } finally {
+        _currentIdleThreadCount.decrementAndGet();
+      }
+    }
+
+    @Override
+    @Nullable
+    public E poll(long timeout, TimeUnit unit)
+        throws InterruptedException {
+      _currentIdleThreadCount.incrementAndGet();
+      try {
+        return super.poll(timeout, unit);
+      } finally {
+        _currentIdleThreadCount.decrementAndGet();
+      }
     }
 
     /**
-     * Inserts the specified element at the tail of this queue if there is at 
least one available thread
-     * to run the current task. If all pool threads are actively busy, it 
rejects the offer.
+     * Inserts the specified element at the tail of this queue if there is at 
least one idle thread
+     * to run the current task. If all pool threads are actively busy, the 
offer is rejected.
      *
      * @param e the element to add.
      * @return true if it was possible to add the element to this queue, else 
false
      */
     @Override
-    public boolean offer(@Nonnull E e) {
-      int allWorkingThreads = _executor.getActiveCount() + super.size();
-      return allWorkingThreads < _executor.getPoolSize() && super.offer(e);
+    public boolean offer(E e) {
+      return _currentIdleThreadCount.get() > 0 && super.offer(e);
     }
   }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
index 5a1203f1d2..7f0e892a46 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
@@ -57,6 +57,21 @@ public class ScalingThreadPoolExecutorTest {
         "Timed out waiting for thread pool to scale down");
   }
 
+  @Test
+  public void testRapidSubmission() {
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor) 
ScalingThreadPoolExecutor.newScalingThreadPool(0, 4, 0L);
+    Runnable r1 = getSleepingRunnable();
+    Runnable r2 = getSleepingRunnable();
+
+    // When Runnables are submitted rapidly, the pool should scale up to 2 
threads. The previous test cases can fail
+    // to catch such a race condition because Runnables are initialized as 
they are submitted, which introduced enough
+    // delay to avoid the condition
+    executorService.submit(r1);
+    executorService.submit(r2);
+    TestUtils.waitForCondition(aVoid -> executorService.getPoolSize() == 2, 
2000,
+        "Timed out waiting for thread pool to scale up");
+  }
+
   private Runnable getSleepingRunnable() {
     return () -> {
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to