This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 4c314648eccc75c0f8108b3ed394dea69bf75593 Author: Mark Thomas <ma...@apache.org> AuthorDate: Wed Jul 21 16:36:32 2021 +0100 Merge in changes from Tomcat's extended ThreadPoolExecutor --- .../tomcat/util/threads/ThreadPoolExecutor.java | 165 ++++++++++++++++++++- 1 file changed, 160 insertions(+), 5 deletions(-) diff --git a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java index 5193209..7b1ec55 100644 --- a/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java +++ b/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java @@ -37,10 +37,13 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.tomcat.util.res.StringManager; + /** * An {@link java.util.concurrent.ExecutorService} * that executes each submitted task using @@ -325,6 +328,9 @@ import java.util.concurrent.locks.ReentrantLock; * @author Doug Lea */ public class ThreadPoolExecutor extends AbstractExecutorService { + + protected static final StringManager sm = StringManager.getManager("org.apache.tomcat.util.threads.res"); + /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields @@ -488,6 +494,22 @@ public class ThreadPoolExecutor extends AbstractExecutorService { */ private long completedTaskCount; + /** + * The number of tasks submitted but not yet finished. This includes tasks + * in the queue and tasks that have been handed to a worker thread but the + * latter did not start executing the task yet. + * This number is always greater or equal to {@link #getActiveCount()}. + */ + private final AtomicInteger submittedCount = new AtomicInteger(0); + private final AtomicLong lastContextStoppedTime = new AtomicLong(0L); + + /** + * Most recent time in ms when a thread decided to kill itself to avoid + * potential memory leaks. Useful to throttle the rate of renewals of + * threads. + */ + private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L); + /* * All user control parameters are declared as volatiles so that * ongoing actions are based on freshest values, but without need @@ -496,6 +518,11 @@ public class ThreadPoolExecutor extends AbstractExecutorService { */ /** + * Delay in ms between 2 threads being renewed. If negative, do not renew threads. + */ + private volatile long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY; + + /** * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's @@ -555,8 +582,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService { /** * The default rejected execution handler. */ - private static final RejectedExecutionHandler defaultHandler = - new AbortPolicy(); + private static final RejectedExecutionHandler defaultHandler = new RejectHandler(); /** * Permission required for callers of shutdown and shutdownNow. @@ -1336,6 +1362,31 @@ public class ThreadPoolExecutor extends AbstractExecutorService { this.handler = handler; } + + @Override + public void execute(Runnable command) { + submittedCount.incrementAndGet(); + try { + executeInternal(command); + } catch (RejectedExecutionException rx) { + if (getQueue() instanceof TaskQueue) { + // If the Executor is close to maximum pool size, concurrent + // calls to execute() may result (due to Tomcat's use of + // TaskQueue) in some tasks being rejected rather than queued. + // If this happens, add them to the queue. + final TaskQueue queue = (TaskQueue) getQueue(); + if (!queue.force(command)) { + submittedCount.decrementAndGet(); + throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); + } + } else { + submittedCount.decrementAndGet(); + throw rx; + } + } + } + + /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. @@ -1350,8 +1401,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService { * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ - @Override - public void execute(Runnable command) { + private void executeInternal(Runnable command) { if (command == null) { throw new NullPointerException(); } @@ -1764,6 +1814,17 @@ public class ThreadPoolExecutor extends AbstractExecutorService { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } + + public long getThreadRenewalDelay() { + return threadRenewalDelay; + } + + + public void setThreadRenewalDelay(long threadRenewalDelay) { + this.threadRenewalDelay = threadRenewalDelay; + } + + /* User-level queue utilities */ /** @@ -1834,6 +1895,37 @@ public class ThreadPoolExecutor extends AbstractExecutorService { tryTerminate(); // In case SHUTDOWN and now empty } + + public void contextStopping() { + this.lastContextStoppedTime.set(System.currentTimeMillis()); + + // save the current pool parameters to restore them later + int savedCorePoolSize = this.getCorePoolSize(); + TaskQueue taskQueue = + getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null; + if (taskQueue != null) { + // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize + // checks that queue.remainingCapacity()==0. I did not understand + // why, but to get the intended effect of waking up idle threads, I + // temporarily fake this condition. + taskQueue.setForcedRemainingCapacity(0); + } + + // setCorePoolSize(0) wakes idle threads + this.setCorePoolSize(0); + + // TaskQueue.take() takes care of timing out, so that we are sure that + // all threads of the pool are renewed in a limited time, something like + // (threadKeepAlive + longest request time) + + if (taskQueue != null) { + // ok, restore the state of the queue and pool + taskQueue.resetForcedRemainingCapacity(); + } + this.setCorePoolSize(savedCorePoolSize); + } + + /* Statistics */ /** @@ -1940,6 +2032,12 @@ public class ThreadPoolExecutor extends AbstractExecutorService { } } + + public int getSubmittedCount() { + return submittedCount.get(); + } + + /** * Returns a string identifying this pool, as well as its state, * including indications of run state and estimated worker and @@ -1998,6 +2096,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService { */ protected void beforeExecute(Thread t, Runnable r) { } + /** * Method invoked upon completion of execution of the given Runnable. * This method is invoked by the thread that executed the task. If @@ -2048,7 +2147,56 @@ public class ThreadPoolExecutor extends AbstractExecutorService { * @param t the exception that caused termination, or null if * execution completed normally */ - protected void afterExecute(Runnable r, Throwable t) { } + protected void afterExecute(Runnable r, Throwable t) { + // Throwing StopPooledThreadException is likely to cause this method to + // be called more than once for a given task based on the typical + // implementations of the parent class. This test ensures that + // decrementAndGet() is only called once after each task execution. + if (!(t instanceof StopPooledThreadException)) { + submittedCount.decrementAndGet(); + } + + if (t == null) { + stopCurrentThreadIfNeeded(); + } + } + + + /** + * If the current thread was started before the last time when a context was + * stopped, an exception is thrown so that the current thread is stopped. + */ + protected void stopCurrentThreadIfNeeded() { + if (currentThreadShouldBeStopped()) { + long lastTime = lastTimeThreadKilledItself.longValue(); + if (lastTime + threadRenewalDelay < System.currentTimeMillis()) { + if (lastTimeThreadKilledItself.compareAndSet(lastTime, + System.currentTimeMillis() + 1)) { + // OK, it's really time to dispose of this thread + + final String msg = sm.getString( + "threadPoolExecutor.threadStoppedToAvoidPotentialLeak", + Thread.currentThread().getName()); + + throw new StopPooledThreadException(msg); + } + } + } + } + + + protected boolean currentThreadShouldBeStopped() { + if (threadRenewalDelay >= 0 + && Thread.currentThread() instanceof TaskThread) { + TaskThread currentTaskThread = (TaskThread) Thread.currentThread(); + if (currentTaskThread.getCreationTime() < + this.lastContextStoppedTime.longValue()) { + return true; + } + } + return false; + } + /** * Method invoked when the Executor has terminated. Default @@ -2178,6 +2326,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService { } } + private static class RejectHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + throw new RejectedExecutionException(); + } + } + public interface RejectedExecutionHandler { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org