This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 4c314648eccc75c0f8108b3ed394dea69bf75593
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed Jul 21 16:36:32 2021 +0100

    Merge in changes from Tomcat's extended ThreadPoolExecutor
---
 .../tomcat/util/threads/ThreadPoolExecutor.java    | 165 ++++++++++++++++++++-
 1 file changed, 160 insertions(+), 5 deletions(-)

diff --git a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java 
b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
index 5193209..7b1ec55 100644
--- a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
+++ b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
@@ -37,10 +37,13 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.tomcat.util.res.StringManager;
+
 /**
  * An {@link java.util.concurrent.ExecutorService}
  * that executes each submitted task using
@@ -325,6 +328,9 @@ import java.util.concurrent.locks.ReentrantLock;
  * @author Doug Lea
  */
 public class ThreadPoolExecutor extends AbstractExecutorService {
+
+    protected static final StringManager sm = 
StringManager.getManager("org.apache.tomcat.util.threads.res");
+
     /**
      * The main pool control state, ctl, is an atomic integer packing
      * two conceptual fields
@@ -488,6 +494,22 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
      */
     private long completedTaskCount;
 
+    /**
+     * The number of tasks submitted but not yet finished. This includes tasks
+     * in the queue and tasks that have been handed to a worker thread but the
+     * latter did not start executing the task yet.
+     * This number is always greater or equal to {@link #getActiveCount()}.
+     */
+    private final AtomicInteger submittedCount = new AtomicInteger(0);
+    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
+
+    /**
+     * Most recent time in ms when a thread decided to kill itself to avoid
+     * potential memory leaks. Useful to throttle the rate of renewals of
+     * threads.
+     */
+    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
+
     /*
      * All user control parameters are declared as volatiles so that
      * ongoing actions are based on freshest values, but without need
@@ -496,6 +518,11 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
      */
 
     /**
+     * Delay in ms between 2 threads being renewed. If negative, do not renew 
threads.
+     */
+    private volatile long threadRenewalDelay = 
Constants.DEFAULT_THREAD_RENEWAL_DELAY;
+
+    /**
      * Factory for new threads. All threads are created using this
      * factory (via method addWorker).  All callers must be prepared
      * for addWorker to fail, which may reflect a system or user's
@@ -555,8 +582,7 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
     /**
      * The default rejected execution handler.
      */
-    private static final RejectedExecutionHandler defaultHandler =
-        new AbortPolicy();
+    private static final RejectedExecutionHandler defaultHandler = new 
RejectHandler();
 
     /**
      * Permission required for callers of shutdown and shutdownNow.
@@ -1336,6 +1362,31 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
         this.handler = handler;
     }
 
+
+    @Override
+    public void execute(Runnable command) {
+        submittedCount.incrementAndGet();
+        try {
+            executeInternal(command);
+        } catch (RejectedExecutionException rx) {
+            if (getQueue() instanceof TaskQueue) {
+                // If the Executor is close to maximum pool size, concurrent
+                // calls to execute() may result (due to Tomcat's use of
+                // TaskQueue) in some tasks being rejected rather than queued.
+                // If this happens, add them to the queue.
+                final TaskQueue queue = (TaskQueue) getQueue();
+                if (!queue.force(command)) {
+                    submittedCount.decrementAndGet();
+                    throw new 
RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
+                }
+            } else {
+                submittedCount.decrementAndGet();
+                throw rx;
+            }
+        }
+    }
+
+
     /**
      * Executes the given task sometime in the future.  The task
      * may execute in a new thread or in an existing pooled thread.
@@ -1350,8 +1401,7 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
      *         cannot be accepted for execution
      * @throws NullPointerException if {@code command} is null
      */
-    @Override
-    public void execute(Runnable command) {
+    private void executeInternal(Runnable command) {
         if (command == null) {
             throw new NullPointerException();
         }
@@ -1764,6 +1814,17 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
     }
 
+
+    public long getThreadRenewalDelay() {
+        return threadRenewalDelay;
+    }
+
+
+    public void setThreadRenewalDelay(long threadRenewalDelay) {
+        this.threadRenewalDelay = threadRenewalDelay;
+    }
+
+
     /* User-level queue utilities */
 
     /**
@@ -1834,6 +1895,37 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
         tryTerminate(); // In case SHUTDOWN and now empty
     }
 
+
+    public void contextStopping() {
+        this.lastContextStoppedTime.set(System.currentTimeMillis());
+
+        // save the current pool parameters to restore them later
+        int savedCorePoolSize = this.getCorePoolSize();
+        TaskQueue taskQueue =
+                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : 
null;
+        if (taskQueue != null) {
+            // note by slaurent : quite oddly 
threadPoolExecutor.setCorePoolSize
+            // checks that queue.remainingCapacity()==0. I did not understand
+            // why, but to get the intended effect of waking up idle threads, I
+            // temporarily fake this condition.
+            taskQueue.setForcedRemainingCapacity(0);
+        }
+
+        // setCorePoolSize(0) wakes idle threads
+        this.setCorePoolSize(0);
+
+        // TaskQueue.take() takes care of timing out, so that we are sure that
+        // all threads of the pool are renewed in a limited time, something 
like
+        // (threadKeepAlive + longest request time)
+
+        if (taskQueue != null) {
+            // ok, restore the state of the queue and pool
+            taskQueue.resetForcedRemainingCapacity();
+        }
+        this.setCorePoolSize(savedCorePoolSize);
+    }
+
+
     /* Statistics */
 
     /**
@@ -1940,6 +2032,12 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
         }
     }
 
+
+    public int getSubmittedCount() {
+        return submittedCount.get();
+    }
+
+
     /**
      * Returns a string identifying this pool, as well as its state,
      * including indications of run state and estimated worker and
@@ -1998,6 +2096,7 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
      */
     protected void beforeExecute(Thread t, Runnable r) { }
 
+
     /**
      * Method invoked upon completion of execution of the given Runnable.
      * This method is invoked by the thread that executed the task. If
@@ -2048,7 +2147,56 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
      * @param t the exception that caused termination, or null if
      * execution completed normally
      */
-    protected void afterExecute(Runnable r, Throwable t) { }
+    protected void afterExecute(Runnable r, Throwable t) {
+        // Throwing StopPooledThreadException is likely to cause this method to
+        // be called more than once for a given task based on the typical
+        // implementations of the parent class. This test ensures that
+        // decrementAndGet() is only called once after each task execution.
+        if (!(t instanceof StopPooledThreadException)) {
+            submittedCount.decrementAndGet();
+        }
+
+        if (t == null) {
+            stopCurrentThreadIfNeeded();
+        }
+    }
+
+
+    /**
+     * If the current thread was started before the last time when a context 
was
+     * stopped, an exception is thrown so that the current thread is stopped.
+     */
+    protected void stopCurrentThreadIfNeeded() {
+        if (currentThreadShouldBeStopped()) {
+            long lastTime = lastTimeThreadKilledItself.longValue();
+            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
+                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
+                        System.currentTimeMillis() + 1)) {
+                    // OK, it's really time to dispose of this thread
+
+                    final String msg = sm.getString(
+                                    
"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
+                                    Thread.currentThread().getName());
+
+                    throw new StopPooledThreadException(msg);
+                }
+            }
+        }
+    }
+
+
+    protected boolean currentThreadShouldBeStopped() {
+        if (threadRenewalDelay >= 0
+            && Thread.currentThread() instanceof TaskThread) {
+            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
+            if (currentTaskThread.getCreationTime() <
+                    this.lastContextStoppedTime.longValue()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 
     /**
      * Method invoked when the Executor has terminated.  Default
@@ -2178,6 +2326,13 @@ public class ThreadPoolExecutor extends 
AbstractExecutorService {
         }
     }
 
+    private static class RejectHandler implements RejectedExecutionHandler {
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
{
+            throw new RejectedExecutionException();
+        }
+    }
+
 
     public interface RejectedExecutionHandler {
 

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to