Author: remm Date: Thu Nov 8 10:51:37 2018 New Revision: 1846122 URL: http://svn.apache.org/viewvc?rev=1846122&view=rev Log: Refactor async timeout threads of the connectors using a scheduled executor.
Modified: tomcat/trunk/java/org/apache/catalina/connector/Connector.java tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/catalina/connector/Connector.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Connector.java?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/Connector.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/Connector.java Thu Nov 8 10:51:37 2018 @@ -948,6 +948,9 @@ public class Connector extends Lifecycle // Initialize adapter adapter = new CoyoteAdapter(this); protocolHandler.setAdapter(adapter); + if (service != null) { + protocolHandler.setUtilityExecutor(service.getUtilityExecutor()); + } // Make sure parseBodyMethodsSet has a default if (null == parseBodyMethodsSet) { Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Nov 8 10:51:37 2018 @@ -23,6 +23,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -90,12 +93,10 @@ public abstract class AbstractProtocol<S private final Set<Processor> waitingProcessors = Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>()); - /** - * The async timeout thread. + * Controller for the async timeout scheduling. */ - private AsyncTimeout asyncTimeout = null; - + private ScheduledFuture<?> asyncTimeoutFuture = null; public AbstractProtocol(AbstractEndpoint<S,?> endpoint) { this.endpoint = endpoint; @@ -201,20 +202,24 @@ public abstract class AbstractProtocol<S } - public AsyncTimeout getAsyncTimeout() { - return asyncTimeout; - } - - // ---------------------- Properties that are passed through to the EndPoint @Override public Executor getExecutor() { return endpoint.getExecutor(); } + @Override public void setExecutor(Executor executor) { endpoint.setExecutor(executor); } + @Override + public ScheduledExecutorService getUtilityExecutor() { return endpoint.getUtilityExecutor(); } + @Override + public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { + endpoint.setUtilityExecutor(utilityExecutor); + } + + public int getMaxThreads() { return endpoint.getMaxThreads(); } public void setMaxThreads(int maxThreads) { endpoint.setMaxThreads(maxThreads); @@ -559,19 +564,36 @@ public abstract class AbstractProtocol<S } endpoint.start(); + startAsyncTimeout(); + } + + + protected void startAsyncTimeout() { + if (asyncTimeoutFuture != null) { + return; + } + asyncTimeoutFuture = getUtilityExecutor().scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + if (!endpoint.isPaused()) { + long now = System.currentTimeMillis(); + for (Processor processor : waitingProcessors) { + processor.timeoutAsync(now); + } + } + } - // Start async timeout thread - asyncTimeout = new AsyncTimeout(); - Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout"); - int priority = endpoint.getThreadPriority(); - if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { - priority = Thread.NORM_PRIORITY; - } - timeoutThread.setPriority(priority); - timeoutThread.setDaemon(true); - timeoutThread.start(); + }, 1, 1, TimeUnit.SECONDS); } + protected void stopAsyncTimeout() { + if (asyncTimeoutFuture == null) { + return; + } + asyncTimeoutFuture.cancel(false); + asyncTimeoutFuture = null; + } @Override public void pause() throws Exception { @@ -579,6 +601,7 @@ public abstract class AbstractProtocol<S getLog().info(sm.getString("abstractProtocolHandler.pause", getName())); } + stopAsyncTimeout(); endpoint.pause(); } @@ -595,6 +618,7 @@ public abstract class AbstractProtocol<S } endpoint.resume(); + startAsyncTimeout(); } @@ -605,8 +629,10 @@ public abstract class AbstractProtocol<S logPortOffset(); } - if (asyncTimeout != null) { - asyncTimeout.stop(); + stopAsyncTimeout(); + // Timeout any pending async request + for (Processor processor : waitingProcessors) { + processor.timeoutAsync(-1); } endpoint.stop(); @@ -1113,52 +1139,4 @@ public abstract class AbstractProtocol<S } } - - /** - * Async timeout thread - */ - protected class AsyncTimeout implements Runnable { - - private volatile boolean asyncTimeoutRunning = true; - - /** - * The background thread that checks async requests and fires the - * timeout if there has been no activity. - */ - @Override - public void run() { - - // Loop until we receive a shutdown command - while (asyncTimeoutRunning) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - long now = System.currentTimeMillis(); - for (Processor processor : waitingProcessors) { - processor.timeoutAsync(now); - } - - // Loop if endpoint is paused - while (endpoint.isPaused() && asyncTimeoutRunning) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - } - } - - - protected void stop() { - asyncTimeoutRunning = false; - - // Timeout any pending async request - for (Processor processor : waitingProcessors) { - processor.timeoutAsync(-1); - } - } - } } Modified: tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java Thu Nov 8 10:51:37 2018 @@ -17,15 +17,14 @@ package org.apache.coyote; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import org.apache.tomcat.util.net.SSLHostConfig; /** * Abstract the protocol implementation, including threading, etc. - * Processor is single threaded and specific to stream-based protocols, - * will not fit Jk protocols like JNI. * - * This is the main interface to be implemented by a coyote connector. + * This is the main interface to be implemented by a coyote protocol. * Adapter is the main interface to be implemented by a coyote servlet * container. * @@ -36,12 +35,18 @@ import org.apache.tomcat.util.net.SSLHos public interface ProtocolHandler { /** + * Return the adapter associated with the protocol handler. + * @return the adapter + */ + public Adapter getAdapter(); + + + /** * The adapter, used to call the connector. * * @param adapter The adapter to associate */ public void setAdapter(Adapter adapter); - public Adapter getAdapter(); /** @@ -53,6 +58,27 @@ public interface ProtocolHandler { /** + * Set the optional executor that will be used by the connector. + * @param executor the executor + */ + public void setExecutor(Executor executor); + + + /** + * Get the utility executor that should be used by the protocol handler. + * @return the executor + */ + public ScheduledExecutorService getUtilityExecutor(); + + + /** + * Set the utility executor that should be used by the protocol handler. + * @param utilityExecutor the executor + */ + public void setUtilityExecutor(ScheduledExecutorService utilityExecutor); + + + /** * Initialise the protocol. * * @throws Exception If the protocol handler fails to initialise @@ -126,10 +152,32 @@ public interface ProtocolHandler { public boolean isSendfileSupported(); + /** + * Add a new SSL configuration for a virtual host. + * @param sslHostConfig the configuration + */ public void addSslHostConfig(SSLHostConfig sslHostConfig); + + + /** + * Find all configured SSL virtual host configurations which will be used + * by SNI. + * @return the configurations + */ public SSLHostConfig[] findSslHostConfigs(); + /** + * Add a new protocol for used by HTTP/1.1 upgrade or ALPN. + * @param upgradeProtocol the protocol + */ public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol); + + + /** + * Return all configured upgrade protocols. + * @return the protocols + */ public UpgradeProtocol[] findUpgradeProtocols(); + } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Thu Nov 8 10:51:37 2018 @@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.MalformedObjectNameException; @@ -449,6 +451,22 @@ public abstract class AbstractEndpoint<S /** + * External Executor based thread pool for utility tasks. + */ + private ScheduledExecutorService utilityExecutor = null; + public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { + this.utilityExecutor = utilityExecutor; + } + public ScheduledExecutorService getUtilityExecutor() { + if (utilityExecutor == null) { + getLog().warn(sm.getString("endpoint.warn.noUtilityExecutor")); + utilityExecutor = new ScheduledThreadPoolExecutor(1); + } + return utilityExecutor; + } + + + /** * Server socket port. */ private int port = -1; @@ -1153,7 +1171,7 @@ public abstract class AbstractEndpoint<S } - protected final void startAcceptorThreads() { + protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties [UTF-8] (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties [UTF-8] Thu Nov 8 10:51:37 2018 @@ -26,6 +26,7 @@ endpoint.warn.noLocalAddr=Unable to dete endpoint.warn.noLocalName=Unable to determine local host name for socket [{0}] endpoint.warn.noLocalPort=Unable to determine local port for socket [{0}] endpoint.warn.noSendfileWithSSL=Sendfile is not supported for the connector when SSL is enabled +endpoint.warn.noUtilityExecutor=No utility executor was set, creating one endpoint.warn.incorrectConnectionCount=Incorrect connection count, multiple socket.close called on the same socket. endpoint.debug.channelCloseFail=Failed to close channel endpoint.debug.destroySocket=Destroying socket [{0}] Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1846122&r1=1846121&r2=1846122&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Thu Nov 8 10:51:37 2018 @@ -77,6 +77,14 @@ </update> </changelog> </subsection> + <subsection name="Coyote"> + <changelog> + <update> + Refactor connector async timeout threads using a scheduled executor. + (remm) + </update> + </changelog> + </subsection> <subsection name="Tribes"> <changelog> <update> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org