Author: remm
Date: Thu Nov  8 10:51:37 2018
New Revision: 1846122

URL: http://svn.apache.org/viewvc?rev=1846122&view=rev
Log:
Refactor async timeout threads of the connectors using a scheduled executor.

Modified:
    tomcat/trunk/java/org/apache/catalina/connector/Connector.java
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/catalina/connector/Connector.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Connector.java?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Connector.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Connector.java Thu Nov  8 
10:51:37 2018
@@ -948,6 +948,9 @@ public class Connector extends Lifecycle
         // Initialize adapter
         adapter = new CoyoteAdapter(this);
         protocolHandler.setAdapter(adapter);
+        if (service != null) {
+            protocolHandler.setUtilityExecutor(service.getUtilityExecutor());
+        }
 
         // Make sure parseBodyMethodsSet has a default
         if (null == parseBodyMethodsSet) {

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Nov  8 
10:51:37 2018
@@ -23,6 +23,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -90,12 +93,10 @@ public abstract class AbstractProtocol<S
     private final Set<Processor> waitingProcessors =
             Collections.newSetFromMap(new ConcurrentHashMap<Processor, 
Boolean>());
 
-
     /**
-     * The async timeout thread.
+     * Controller for the async timeout scheduling.
      */
-    private AsyncTimeout asyncTimeout = null;
-
+    private ScheduledFuture<?> asyncTimeoutFuture = null;
 
     public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
         this.endpoint = endpoint;
@@ -201,20 +202,24 @@ public abstract class AbstractProtocol<S
     }
 
 
-    public AsyncTimeout getAsyncTimeout() {
-        return asyncTimeout;
-    }
-
-
     // ---------------------- Properties that are passed through to the 
EndPoint
 
     @Override
     public Executor getExecutor() { return endpoint.getExecutor(); }
+    @Override
     public void setExecutor(Executor executor) {
         endpoint.setExecutor(executor);
     }
 
 
+    @Override
+    public ScheduledExecutorService getUtilityExecutor() { return 
endpoint.getUtilityExecutor(); }
+    @Override
+    public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
+        endpoint.setUtilityExecutor(utilityExecutor);
+    }
+
+
     public int getMaxThreads() { return endpoint.getMaxThreads(); }
     public void setMaxThreads(int maxThreads) {
         endpoint.setMaxThreads(maxThreads);
@@ -559,19 +564,36 @@ public abstract class AbstractProtocol<S
         }
 
         endpoint.start();
+        startAsyncTimeout();
+    }
+
+
+    protected void startAsyncTimeout() {
+        if (asyncTimeoutFuture != null) {
+            return;
+        }
+        asyncTimeoutFuture = getUtilityExecutor().scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        if (!endpoint.isPaused()) {
+                            long now = System.currentTimeMillis();
+                            for (Processor processor : waitingProcessors) {
+                                processor.timeoutAsync(now);
+                            }
+                        }
+                    }
 
-        // Start async timeout thread
-        asyncTimeout = new AsyncTimeout();
-        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + 
"-AsyncTimeout");
-        int priority = endpoint.getThreadPriority();
-        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
-            priority = Thread.NORM_PRIORITY;
-        }
-        timeoutThread.setPriority(priority);
-        timeoutThread.setDaemon(true);
-        timeoutThread.start();
+                }, 1, 1, TimeUnit.SECONDS);
     }
 
+    protected void stopAsyncTimeout() {
+        if (asyncTimeoutFuture == null) {
+            return;
+        }
+        asyncTimeoutFuture.cancel(false);
+        asyncTimeoutFuture = null;
+    }
 
     @Override
     public void pause() throws Exception {
@@ -579,6 +601,7 @@ public abstract class AbstractProtocol<S
             getLog().info(sm.getString("abstractProtocolHandler.pause", 
getName()));
         }
 
+        stopAsyncTimeout();
         endpoint.pause();
     }
 
@@ -595,6 +618,7 @@ public abstract class AbstractProtocol<S
         }
 
         endpoint.resume();
+        startAsyncTimeout();
     }
 
 
@@ -605,8 +629,10 @@ public abstract class AbstractProtocol<S
             logPortOffset();
         }
 
-        if (asyncTimeout != null) {
-            asyncTimeout.stop();
+        stopAsyncTimeout();
+        // Timeout any pending async request
+        for (Processor processor : waitingProcessors) {
+            processor.timeoutAsync(-1);
         }
 
         endpoint.stop();
@@ -1113,52 +1139,4 @@ public abstract class AbstractProtocol<S
         }
     }
 
-
-    /**
-     * Async timeout thread
-     */
-    protected class AsyncTimeout implements Runnable {
-
-        private volatile boolean asyncTimeoutRunning = true;
-
-        /**
-         * The background thread that checks async requests and fires the
-         * timeout if there has been no activity.
-         */
-        @Override
-        public void run() {
-
-            // Loop until we receive a shutdown command
-            while (asyncTimeoutRunning) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-                long now = System.currentTimeMillis();
-                for (Processor processor : waitingProcessors) {
-                   processor.timeoutAsync(now);
-                }
-
-                // Loop if endpoint is paused
-                while (endpoint.isPaused() && asyncTimeoutRunning) {
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-            }
-        }
-
-
-        protected void stop() {
-            asyncTimeoutRunning = false;
-
-            // Timeout any pending async request
-            for (Processor processor : waitingProcessors) {
-                processor.timeoutAsync(-1);
-            }
-        }
-    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ProtocolHandler.java Thu Nov  8 
10:51:37 2018
@@ -17,15 +17,14 @@
 package org.apache.coyote;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.tomcat.util.net.SSLHostConfig;
 
 /**
  * Abstract the protocol implementation, including threading, etc.
- * Processor is single threaded and specific to stream-based protocols,
- * will not fit Jk protocols like JNI.
  *
- * This is the main interface to be implemented by a coyote connector.
+ * This is the main interface to be implemented by a coyote protocol.
  * Adapter is the main interface to be implemented by a coyote servlet
  * container.
  *
@@ -36,12 +35,18 @@ import org.apache.tomcat.util.net.SSLHos
 public interface ProtocolHandler {
 
     /**
+     * Return the adapter associated with the protocol handler.
+     * @return the adapter
+     */
+    public Adapter getAdapter();
+
+
+    /**
      * The adapter, used to call the connector.
      *
      * @param adapter The adapter to associate
      */
     public void setAdapter(Adapter adapter);
-    public Adapter getAdapter();
 
 
     /**
@@ -53,6 +58,27 @@ public interface ProtocolHandler {
 
 
     /**
+     * Set the optional executor that will be used by the connector.
+     * @param executor the executor
+     */
+    public void setExecutor(Executor executor);
+
+
+    /**
+     * Get the utility executor that should be used by the protocol handler.
+     * @return the executor
+     */
+    public ScheduledExecutorService getUtilityExecutor();
+
+
+    /**
+     * Set the utility executor that should be used by the protocol handler.
+     * @param utilityExecutor the executor
+     */
+    public void setUtilityExecutor(ScheduledExecutorService utilityExecutor);
+
+
+    /**
      * Initialise the protocol.
      *
      * @throws Exception If the protocol handler fails to initialise
@@ -126,10 +152,32 @@ public interface ProtocolHandler {
     public boolean isSendfileSupported();
 
 
+    /**
+     * Add a new SSL configuration for a virtual host.
+     * @param sslHostConfig the configuration
+     */
     public void addSslHostConfig(SSLHostConfig sslHostConfig);
+
+
+    /**
+     * Find all configured SSL virtual host configurations which will be used
+     * by SNI.
+     * @return the configurations
+     */
     public SSLHostConfig[] findSslHostConfigs();
 
 
+    /**
+     * Add a new protocol for used by HTTP/1.1 upgrade or ALPN.
+     * @param upgradeProtocol the protocol
+     */
     public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol);
+
+
+    /**
+     * Return all configured upgrade protocols.
+     * @return the protocols
+     */
     public UpgradeProtocol[] findUpgradeProtocols();
+
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Thu Nov  
8 10:51:37 2018
@@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.MalformedObjectNameException;
@@ -449,6 +451,22 @@ public abstract class AbstractEndpoint<S
 
 
     /**
+     * External Executor based thread pool for utility tasks.
+     */
+    private ScheduledExecutorService utilityExecutor = null;
+    public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
+        this.utilityExecutor = utilityExecutor;
+    }
+    public ScheduledExecutorService getUtilityExecutor() {
+        if (utilityExecutor == null) {
+            getLog().warn(sm.getString("endpoint.warn.noUtilityExecutor"));
+            utilityExecutor = new ScheduledThreadPoolExecutor(1);
+        }
+        return utilityExecutor;
+    }
+
+
+    /**
      * Server socket port.
      */
     private int port = -1;
@@ -1153,7 +1171,7 @@ public abstract class AbstractEndpoint<S
     }
 
 
-    protected final void startAcceptorThreads() {
+    protected void startAcceptorThreads() {
         int count = getAcceptorThreadCount();
         acceptors = new ArrayList<>(count);
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties 
[UTF-8] (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/LocalStrings.properties 
[UTF-8] Thu Nov  8 10:51:37 2018
@@ -26,6 +26,7 @@ endpoint.warn.noLocalAddr=Unable to dete
 endpoint.warn.noLocalName=Unable to determine local host name for socket [{0}]
 endpoint.warn.noLocalPort=Unable to determine local port for socket [{0}]
 endpoint.warn.noSendfileWithSSL=Sendfile is not supported for the connector 
when SSL is enabled
+endpoint.warn.noUtilityExecutor=No utility executor was set, creating one
 endpoint.warn.incorrectConnectionCount=Incorrect connection count, multiple 
socket.close called on the same socket.
 endpoint.debug.channelCloseFail=Failed to close channel
 endpoint.debug.destroySocket=Destroying socket [{0}]

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1846122&r1=1846121&r2=1846122&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Nov  8 10:51:37 2018
@@ -77,6 +77,14 @@
       </update>
     </changelog>
   </subsection>
+  <subsection name="Coyote">
+    <changelog>
+      <update>
+        Refactor connector async timeout threads using a scheduled executor.
+        (remm)
+      </update>
+    </changelog>
+  </subsection>
   <subsection name="Tribes">
     <changelog>
       <update>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to