This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new c0577c99c3 Implement ExecutorService c0577c99c3 is described below commit c0577c99c32ef67be2e6266fdde1d612035bbfdc Author: remm <r...@apache.org> AuthorDate: Fri Mar 1 09:50:55 2024 +0100 Implement ExecutorService This allows better interactions with NIO2. BZ68692 --- .../catalina/core/StandardThreadExecutor.java | 121 +++++++++++++++++++- .../core/StandardVirtualThreadExecutor.java | 124 ++++++++++++++++++++- .../apache/tomcat/util/net/LocalStrings.properties | 1 + java/org/apache/tomcat/util/net/Nio2Endpoint.java | 2 + .../util/threads/ScheduledThreadPoolExecutor.java | 3 +- webapps/docs/changelog.xml | 7 +- 6 files changed, 253 insertions(+), 5 deletions(-) diff --git a/java/org/apache/catalina/core/StandardThreadExecutor.java b/java/org/apache/catalina/core/StandardThreadExecutor.java index 83a95e6fac..162723dcd8 100644 --- a/java/org/apache/catalina/core/StandardThreadExecutor.java +++ b/java/org/apache/catalina/core/StandardThreadExecutor.java @@ -16,7 +16,15 @@ */ package org.apache.catalina.core; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.catalina.Executor; import org.apache.catalina.LifecycleException; @@ -28,7 +36,7 @@ import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; import org.apache.tomcat.util.threads.ThreadPoolExecutor; -public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor, ResizableExecutor { +public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor, ExecutorService, ResizableExecutor { protected static final StringManager sm = StringManager.getManager(StandardThreadExecutor.class); @@ -293,4 +301,115 @@ public class StandardThreadExecutor extends LifecycleMBeanBase implements Execut protected String getObjectNameKeyProperties() { return "type=Executor,name=" + getName(); } + + + @Override + public void shutdown() { + // Controlled by Lifecycle instead + } + + + @Override + public List<Runnable> shutdownNow() { + // Controlled by Lifecycle instead + return Collections.emptyList(); + } + + + @Override + public boolean isShutdown() { + if (executor != null) { + return executor.isShutdown(); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public boolean isTerminated() { + if (executor != null) { + return executor.isTerminated(); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + + @Override + public <T> Future<T> submit(Callable<T> task) { + if (executor != null) { + return executor.submit(task); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> Future<T> submit(Runnable task, T result) { + if (executor != null) { + return executor.submit(task, result); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public Future<?> submit(Runnable task) { + if (executor != null) { + return executor.submit(task); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + if (executor != null) { + return executor.invokeAll(tasks); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + if (executor != null) { + return executor.invokeAll(tasks, timeout, unit); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + if (executor != null) { + return executor.invokeAny(tasks); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (executor != null) { + return executor.invokeAny(tasks, timeout, unit); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } } diff --git a/java/org/apache/catalina/core/StandardVirtualThreadExecutor.java b/java/org/apache/catalina/core/StandardVirtualThreadExecutor.java index 9180582d54..2369ec95a5 100644 --- a/java/org/apache/catalina/core/StandardVirtualThreadExecutor.java +++ b/java/org/apache/catalina/core/StandardVirtualThreadExecutor.java @@ -16,6 +16,16 @@ */ package org.apache.catalina.core; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.apache.catalina.Executor; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleState; @@ -26,12 +36,12 @@ import org.apache.tomcat.util.threads.VirtualThreadExecutor; /** * An executor that uses a new virtual thread for each task. */ -public class StandardVirtualThreadExecutor extends LifecycleMBeanBase implements Executor { +public class StandardVirtualThreadExecutor extends LifecycleMBeanBase implements Executor, ExecutorService { private static final StringManager sm = StringManager.getManager(StandardVirtualThreadExecutor.class); private String name; - private java.util.concurrent.Executor executor; + private java.util.concurrent.ExecutorService executor; private String namePrefix = "tomcat-virt-"; public void setName(String name) { @@ -83,4 +93,114 @@ public class StandardVirtualThreadExecutor extends LifecycleMBeanBase implements protected String getObjectNameKeyProperties() { return "type=Executor,name=" + getName(); } + + @Override + public void shutdown() { + // Controlled by Lifecycle instead + } + + + @Override + public List<Runnable> shutdownNow() { + // Controlled by Lifecycle instead + return Collections.emptyList(); + } + + + @Override + public boolean isShutdown() { + if (executor != null) { + return executor.isShutdown(); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public boolean isTerminated() { + if (executor != null) { + return executor.isTerminated(); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + + @Override + public <T> Future<T> submit(Callable<T> task) { + if (executor != null) { + return executor.submit(task); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> Future<T> submit(Runnable task, T result) { + if (executor != null) { + return executor.submit(task, result); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public Future<?> submit(Runnable task) { + if (executor != null) { + return executor.submit(task); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + if (executor != null) { + return executor.invokeAll(tasks); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + if (executor != null) { + return executor.invokeAll(tasks, timeout, unit); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + if (executor != null) { + return executor.invokeAny(tasks); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } + + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (executor != null) { + return executor.invokeAny(tasks, timeout, unit); + } else { + throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); + } + } } \ No newline at end of file diff --git a/java/org/apache/tomcat/util/net/LocalStrings.properties b/java/org/apache/tomcat/util/net/LocalStrings.properties index e7c5db9aad..20460c9326 100644 --- a/java/org/apache/tomcat/util/net/LocalStrings.properties +++ b/java/org/apache/tomcat/util/net/LocalStrings.properties @@ -93,6 +93,7 @@ endpoint.nio.stopLatchAwaitFail=The pollers did not stop within the expected tim endpoint.nio.stopLatchAwaitInterrupted=This thread was interrupted while waiting for the pollers to stop endpoint.nio.timeoutCme=Exception during processing of timeouts. The code has been checked repeatedly and no concurrent modification has been found. If you are able to repeat this error please open a Tomcat bug and provide the steps to reproduce. endpoint.nio2.exclusiveExecutor=The NIO2 connector requires an exclusive executor to operate properly on shutdown +endpoint.nio2.executorService=The NIO2 connector requires an executor service, the internal JVM threads will be used endpoint.noSslHostConfig=No SSLHostConfig element was found with the hostName [{0}] to match the defaultSSLHostConfigName for the connector [{1}] endpoint.noSslHostName=No host name was provided for the SSL host configuration endpoint.poll.error=Unexpected poller error diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index 8f831fd70e..6aba831e9d 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -119,6 +119,8 @@ public class Nio2Endpoint extends AbstractNetworkChannelEndpoint<Nio2Channel,Asy } if (getExecutor() instanceof ExecutorService) { threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor()); + } else { + log.info(sm.getString("endpoint.nio2.executorService")); } // AsynchronousChannelGroup needs exclusive access to its executor service if (!internalExecutor) { diff --git a/java/org/apache/tomcat/util/threads/ScheduledThreadPoolExecutor.java b/java/org/apache/tomcat/util/threads/ScheduledThreadPoolExecutor.java index 98d10cc5b4..45d9c93c50 100644 --- a/java/org/apache/tomcat/util/threads/ScheduledThreadPoolExecutor.java +++ b/java/org/apache/tomcat/util/threads/ScheduledThreadPoolExecutor.java @@ -17,6 +17,7 @@ package org.apache.tomcat.util.threads; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -50,7 +51,7 @@ public class ScheduledThreadPoolExecutor implements ScheduledExecutorService { @Override public List<Runnable> shutdownNow() { - return null; + return Collections.emptyList(); } @Override diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 83ea4cb230..ddde7f947e 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -123,7 +123,12 @@ <code>OutputStream</code>. Ensure use of either once the response has been recycled triggers a <code>NullPointerException</code> provided that <code>discardFacades</code> is configured with the default value of - <code>true</code>. + <code>true</code>. (markt) + </fix> + <fix> + <bug>68692</bug>: The standard thread pool implementations that are + configured using the <code>Executor</code> element now implement + <code>ExecutorService</code> for better support NIO2. (remm) </fix> </changelog> </subsection> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org