Author: markt
Date: Tue Oct 20 12:11:18 2015
New Revision: 1709578

URL: http://svn.apache.org/viewvc?rev=1709578&view=rev
Log:
Refactor async timeouts
Move async timeout thread to Protocol
Move tracking of timeout setting to Processor
Move tracking of last asycn start to AsyncStateMachine

Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
    tomcat/trunk/java/org/apache/coyote/Processor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Tue Oct 20 
12:11:18 2015
@@ -40,6 +40,7 @@ public abstract class AbstractProcessor
 
     protected Adapter adapter;
     protected final AsyncStateMachine asyncStateMachine;
+    private volatile long asyncTimeout = -1;
     protected final AbstractEndpoint<?> endpoint;
     protected final Request request;
     protected final Response response;
@@ -242,6 +243,39 @@ public abstract class AbstractProcessor
     }
 
 
+    @Override
+    public void timeoutAsync(long now) {
+        if (now < 0) {
+            doTimeoutAsync();
+        } else {
+            long asyncTimeout = getAsyncTimeout();
+            if (asyncTimeout > 0) {
+                long asyncStart = asyncStateMachine.getLastAsyncStart();
+                if ((now - asyncStart) > asyncTimeout) {
+                    doTimeoutAsync();
+                }
+            }
+        }
+    }
+
+
+    private void doTimeoutAsync() {
+        // Avoid multiple timeouts
+        setAsyncTimeout(-1);
+        socketWrapper.processSocket(SocketStatus.TIMEOUT, true);
+    }
+
+
+    public void setAsyncTimeout(long timeout) {
+        asyncTimeout = timeout;
+    }
+
+
+    public long getAsyncTimeout() {
+        return asyncTimeout;
+    }
+
+
     @Override
     public void recycle() {
         errorState = ErrorState.NONE;

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Tue Oct 20 
12:11:18 2015
@@ -19,6 +19,8 @@ package org.apache.coyote;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,9 +88,20 @@ public abstract class AbstractProtocol<S
      */
     private final AbstractEndpoint<S> endpoint;
 
+
     private Handler<S> handler;
 
 
+    private final Set<Processor> waitingProcessors =
+            Collections.newSetFromMap(new ConcurrentHashMap<Processor, 
Boolean>());
+
+
+    /**
+     * The async timeout thread.
+     */
+    private AsyncTimeout asyncTimeout = null;
+
+
     public AbstractProtocol(AbstractEndpoint<S> endpoint) {
         this.endpoint = endpoint;
         setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
@@ -182,6 +195,11 @@ public abstract class AbstractProtocol<S
     }
 
 
+    public AsyncTimeout getAsyncTimeout() {
+        return asyncTimeout;
+    }
+
+
     // ---------------------- Properties that are passed through to the 
EndPoint
 
     @Override
@@ -338,6 +356,16 @@ public abstract class AbstractProtocol<S
     }
 
 
+    public void addWaitingProcessor(Processor processor) {
+        waitingProcessors.add(processor);
+    }
+
+
+    public void removeWaitingProcessor(Processor processor) {
+        waitingProcessors.remove(processor);
+    }
+
+
     // ----------------------------------------------- Accessors for 
sub-classes
 
     protected AbstractEndpoint<S> getEndpoint() {
@@ -514,6 +542,14 @@ public abstract class AbstractProtocol<S
                     getName()), ex);
             throw ex;
         }
+
+
+        // Start async timeout thread
+        asyncTimeout = new AsyncTimeout();
+        Thread timeoutThread = new Thread(asyncTimeout, getName() + 
"-AsyncTimeout");
+        timeoutThread.setPriority(endpoint.getThreadPriority());
+        timeoutThread.setDaemon(true);
+        timeoutThread.start();
     }
 
 
@@ -551,6 +587,9 @@ public abstract class AbstractProtocol<S
         if(getLog().isInfoEnabled())
             getLog().info(sm.getString("abstractProtocolHandler.stop",
                     getName()));
+
+        asyncTimeout.stop();
+
         try {
             endpoint.stop();
         } catch (Exception ex) {
@@ -648,7 +687,6 @@ public abstract class AbstractProtocol<S
                 return SocketState.CLOSED;
             }
 
-            wrapper.setAsync(false);
             ContainerThreadMarker.set();
 
             try {
@@ -684,6 +722,8 @@ public abstract class AbstractProtocol<S
 
                 // Associate the processor with the connection
                 connections.put(socket, processor);
+                // Make sure an async timeout doesn't fire
+                getProtocol().removeWaitingProcessor(processor);
 
                 SocketState state = SocketState.CLOSED;
                 do {
@@ -719,6 +759,9 @@ public abstract class AbstractProtocol<S
                     // depend on type of long poll
                     connections.put(socket, processor);
                     longPoll(wrapper, processor);
+                    if (processor.isAsync()) {
+                        getProtocol().addWaitingProcessor(processor);
+                    }
                 } else if (state == SocketState.OPEN) {
                     // In keep-alive but between requests. OK to recycle
                     // processor. Continue to poll for the next request.
@@ -791,11 +834,8 @@ public abstract class AbstractProtocol<S
 
 
         protected void longPoll(SocketWrapperBase<?> socket, Processor 
processor) {
-            if (processor.isAsync()) {
-                // Async
-                socket.setAsync(true);
-            } else {
-                // This branch is currently only used with HTTP
+            if (!processor.isAsync()) {
+                // This is currently only used with HTTP
                 // Either:
                 //  - this is an upgraded connection
                 //  - the request line/headers have not been completely
@@ -964,4 +1004,53 @@ public abstract class AbstractProtocol<S
             size.set(0);
         }
     }
+
+
+    /**
+     * Async timeout thread
+     */
+    protected class AsyncTimeout implements Runnable {
+
+        private volatile boolean asyncTimeoutRunning = true;
+
+        /**
+         * The background thread that checks async requests and fires the
+         * timeout if there has been no activity.
+         */
+        @Override
+        public void run() {
+
+            // Loop until we receive a shutdown command
+            while (asyncTimeoutRunning) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+                long now = System.currentTimeMillis();
+                for (Processor processor : waitingProcessors) {
+                   processor.timeoutAsync(now);
+                }
+
+                // Loop if endpoint is paused
+                while (endpoint.isPaused() && asyncTimeoutRunning) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+            }
+        }
+
+
+        protected void stop() {
+            asyncTimeoutRunning = false;
+
+            // Timeout any pending async request
+            for (Processor processor : waitingProcessors) {
+                processor.timeoutAsync(-1);
+            }
+        }
+    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java [UTF-8] 
(original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java [UTF-8] Tue Oct 
20 12:11:18 2015
@@ -154,6 +154,7 @@ public class AsyncStateMachine {
 
 
     private volatile AsyncState state = AsyncState.DISPATCHED;
+    private volatile long lastAsyncStart = 0;
     // Need this to fire listener on complete
     private AsyncContextCallback asyncCtxt = null;
     private final AbstractProcessor processor;
@@ -188,10 +189,22 @@ public class AsyncStateMachine {
         return state.isCompleting();
     }
 
+    /**
+     * Obtain the time that this connection last transitioned to async
+     * processing.
+     *
+     * @return The time (as returned by {@link System#currentTimeMillis()}) 
that
+     *         this connection last transitioned to async
+     */
+    public long getLastAsyncStart() {
+        return lastAsyncStart;
+    }
+
     public synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
         if (state == AsyncState.DISPATCHED) {
             state = AsyncState.STARTING;
             this.asyncCtxt = asyncCtxt;
+            lastAsyncStart = System.currentTimeMillis();
         } else {
             throw new IllegalStateException(
                     sm.getString("asyncStateMachine.invalidAsyncState",

Modified: tomcat/trunk/java/org/apache/coyote/Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Processor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Processor.java Tue Oct 20 12:11:18 2015
@@ -49,8 +49,19 @@ public interface Processor {
 
     HttpUpgradeHandler getHttpUpgradeHandler();
 
-    boolean isAsync();
     boolean isUpgrade();
+    boolean isAsync();
+
+    /**
+     * Check this processor to see if the async timeout has expired and process
+     * a timeout if that is that case.
+     *
+     * @param now The time (as returned by {@link System#currentTimeMillis()} 
to
+     *            use as the current time to determine whether the async 
timeout
+     *            has expired. If negative, the timeout will always be treated
+     *            as if it has expired.
+     */
+    void timeoutAsync(long now);
 
     Request getRequest();
 

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Oct 20 
12:11:18 2015
@@ -517,7 +517,7 @@ public class AjpProcessor extends Abstra
         case ASYNC_SETTIMEOUT: {
             if (param == null) return;
             long timeout = ((Long)param).longValue();
-            socketWrapper.setAsyncTimeout(timeout);
+            setAsyncTimeout(timeout);
             break;
         }
         case ASYNC_TIMEOUT: {

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Tue Oct 20 
12:11:18 2015
@@ -742,7 +742,7 @@ public class Http11Processor extends Abs
                 return;
             }
             long timeout = ((Long) param).longValue();
-            socketWrapper.setAsyncTimeout(timeout);
+            setAsyncTimeout(timeout);
             break;
         }
         case ASYNC_DISPATCH: {

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java 
Tue Oct 20 12:11:18 2015
@@ -95,4 +95,10 @@ public abstract class UpgradeProcessorBa
     public ByteBuffer getLeftoverInput() {
         return null;
     }
+
+
+    @Override
+    public void timeoutAsync(long now) {
+        // NO-OP
+    }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Tue Oct 
20 12:11:18 2015
@@ -20,11 +20,9 @@ import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -134,62 +132,8 @@ public abstract class AbstractEndpoint<S
     private static final int MAX_ERROR_DELAY = 1600;
 
 
-    /**
-     * Async timeout thread
-     */
-    protected class AsyncTimeout implements Runnable {
-
-        private volatile boolean asyncTimeoutRunning = true;
-
-        /**
-         * The background thread that checks async requests and fires the
-         * timeout if there has been no activity.
-         */
-        @Override
-        public void run() {
-
-            // Loop until we receive a shutdown command
-            while (asyncTimeoutRunning) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-                long now = System.currentTimeMillis();
-                for (SocketWrapperBase<S> socket : waitingRequests) {
-                    long asyncTimeout = socket.getAsyncTimeout();
-                    if (asyncTimeout > 0) {
-                        long asyncStart = socket.getLastAsyncStart();
-                        if ((now - asyncStart) > asyncTimeout) {
-                            // Avoid multiple timeouts
-                            socket.setAsyncTimeout(-1);
-                            processSocket(socket, SocketStatus.TIMEOUT, true);
-                        }
-                    }
-                }
-
-                // Loop if endpoint is paused
-                while (paused && asyncTimeoutRunning) {
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-
-            }
-        }
-
-
-        protected void stop() {
-            asyncTimeoutRunning = false;
-        }
-    }
-
-
     // ----------------------------------------------------------------- Fields
 
-
     /**
      * Running state of the endpoint.
      */
@@ -962,25 +906,6 @@ public abstract class AbstractEndpoint<S
         } else {
             return MAX_ERROR_DELAY;
         }
-
-    }
-
-
-    protected final Set<SocketWrapperBase<S>> waitingRequests = Collections
-            .newSetFromMap(new ConcurrentHashMap<SocketWrapperBase<S>, 
Boolean>());
-    public void removeWaitingRequest(SocketWrapperBase<S> socketWrapper) {
-        waitingRequests.remove(socketWrapper);
-    }
-
-    /**
-     * The async timeout thread.
-     */
-    private AsyncTimeout asyncTimeout = null;
-    public AsyncTimeout getAsyncTimeout() {
-        return asyncTimeout;
-    }
-    public void setAsyncTimeout(AsyncTimeout asyncTimeout) {
-        this.asyncTimeout = asyncTimeout;
     }
 }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Tue Oct 20 
12:11:18 2015
@@ -598,13 +598,6 @@ public class AprEndpoint extends Abstrac
             }
 
             startAcceptorThreads();
-
-            // Start async timeout thread
-            setAsyncTimeout(new AsyncTimeout());
-            Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + 
"-AsyncTimeout");
-            timeoutThread.setPriority(threadPriority);
-            timeoutThread.setDaemon(true);
-            timeoutThread.start();
         }
     }
 
@@ -629,7 +622,6 @@ public class AprEndpoint extends Abstrac
                     // Ignore
                 }
             }
-            getAsyncTimeout().stop();
             for (AbstractEndpoint.Acceptor acceptor : acceptors) {
                 long waitLeft = 10000;
                 while (waitLeft > 0 &&
@@ -874,7 +866,6 @@ public class AprEndpoint extends Abstrac
             // result of calling AsyncContext.dispatch() from a non-container
             // thread
             synchronized (socket) {
-                waitingRequests.remove(socket);
                 SocketProcessor proc = new SocketProcessor(socket, status);
                 Executor executor = getExecutor();
                 if (dispatch && executor != null) {
@@ -1724,45 +1715,38 @@ public class AprEndpoint extends Abstrac
                                 if (((desc[n*2] & Poll.APR_POLLHUP) == 
Poll.APR_POLLHUP)
                                         || ((desc[n*2] & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)
                                         || ((desc[n*2] & Poll.APR_POLLNVAL) == 
Poll.APR_POLLNVAL)) {
-                                    if (wrapper.isAsync() || 
wrapper.isUpgraded()) {
-                                        // Must be using non-blocking IO for 
the socket to be in the
-                                        // poller during async processing. 
Need to trigger error
-                                        // handling. Poller may return error 
codes plus the flags it
-                                        // was waiting for or it may just 
return an error code. We
-                                        // could return 
ASYNC_[WRITE|READ]_ERROR here but if we do,
-                                        // there will be no exception 
associated with the error in
-                                        // application code. By signalling 
read/write is possible, a
-                                        // read/write will be attempted, fail 
and that will trigger
-                                        // an exception the application will 
see.
-                                        // Check the return flags first, 
followed by what the socket
-                                        // was registered for
-                                        if ((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN) {
-                                            // Error probably occurred during 
a non-blocking read
-                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
-                                                // Close socket and clear pool
-                                                closeSocket(desc[n*2+1]);
-                                            }
-                                        } else if ((desc[n*2] & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
-                                            // Error probably occurred during 
a non-blocking write
-                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
-                                                // Close socket and clear pool
-                                                closeSocket(desc[n*2+1]);
-                                            }
-                                        } else if ((wrapper.pollerFlags & 
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
-                                            // Can't tell what was happening 
when the error occurred but the
-                                            // socket is registered for 
non-blocking read so use that
-                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
-                                                // Close socket and clear pool
-                                                closeSocket(desc[n*2+1]);
-                                            }
-                                        } else if ((wrapper.pollerFlags & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
-                                            // Can't tell what was happening 
when the error occurred but the
-                                            // socket is registered for 
non-blocking write so use that
-                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
-                                                // Close socket and clear pool
-                                                closeSocket(desc[n*2+1]);
-                                            }
-                                        } else {
+                                    // Need to trigger error handling. Poller 
may return error
+                                    // codes plus the flags it was waiting for 
or it may just
+                                    // return an error code. We could handle 
the error here but
+                                    // if we do, there will be no exception 
associated with the
+                                    // error in application code. By 
signalling read/write is
+                                    // possible, a read/write will be 
attempted, fail and that
+                                    // will trigger an exception the 
application will see.
+                                    // Check the return flags first, followed 
by what the socket
+                                    // was registered for
+                                    if ((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN) {
+                                        // Error probably occurred during a 
non-blocking read
+                                        if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
+                                            // Close socket and clear pool
+                                            closeSocket(desc[n*2+1]);
+                                        }
+                                    } else if ((desc[n*2] & Poll.APR_POLLOUT) 
== Poll.APR_POLLOUT) {
+                                        // Error probably occurred during a 
non-blocking write
+                                        if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
+                                            // Close socket and clear pool
+                                            closeSocket(desc[n*2+1]);
+                                        }
+                                    } else if ((wrapper.pollerFlags & 
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+                                        // Can't tell what was happening when 
the error occurred but the
+                                        // socket is registered for 
non-blocking read so use that
+                                        if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
+                                            // Close socket and clear pool
+                                            closeSocket(desc[n*2+1]);
+                                        }
+                                    } else if ((wrapper.pollerFlags & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+                                        // Can't tell what was happening when 
the error occurred but the
+                                        // socket is registered for 
non-blocking write so use that
+                                        if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
                                             // Close socket and clear pool
                                             closeSocket(desc[n*2+1]);
                                         }
@@ -2272,10 +2256,6 @@ public class AprEndpoint extends Abstrac
                         // Close socket and pool
                         closeSocket(socket.getSocket().longValue());
                         socket = null;
-                    } else if (state == Handler.SocketState.LONG) {
-                        if (socket.isAsync()) {
-                            waitingRequests.add(socket);
-                        }
                     }
                 }
             }
@@ -2317,10 +2297,6 @@ public class AprEndpoint extends Abstrac
                 if (state == Handler.SocketState.CLOSED) {
                     // Close socket and pool
                     closeSocket(socket.getSocket().longValue());
-                } else if (state == Handler.SocketState.LONG) {
-                    if (socket.isAsync()) {
-                        waitingRequests.add(socket);
-                    }
                 } else if (state == Handler.SocketState.ASYNC_END) {
                     SocketProcessor proc = new SocketProcessor(socket,
                             SocketStatus.OPEN_READ);

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Oct 20 
12:11:18 2015
@@ -230,12 +230,6 @@ public class Nio2Endpoint extends Abstra
 
             initializeConnectionLatch();
             startAcceptorThreads();
-
-            setAsyncTimeout(new AsyncTimeout());
-            Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + 
"-AsyncTimeout");
-            timeoutThread.setPriority(threadPriority);
-            timeoutThread.setDaemon(true);
-            timeoutThread.start();
         }
     }
 
@@ -251,17 +245,12 @@ public class Nio2Endpoint extends Abstra
         }
         if (running) {
             running = false;
-            getAsyncTimeout().stop();
             unlockAccept();
             // Use the executor to avoid binding the main thread if something 
bad
             // occurs and unbind will also wait for a bit for it to complete
             getExecutor().execute(new Runnable() {
                 @Override
                 public void run() {
-                    // Timeout any pending async request
-                    for (SocketWrapperBase<Nio2Channel> socket : 
waitingRequests) {
-                        processSocket(socket, SocketStatus.TIMEOUT, false);
-                    }
                     // Then close all active connections if any remain
                     try {
                         handler.closeAll();
@@ -393,7 +382,6 @@ public class Nio2Endpoint extends Abstra
 
     protected boolean processSocket0(SocketWrapperBase<Nio2Channel> 
socketWrapper, SocketStatus status, boolean dispatch) {
         try {
-            waitingRequests.remove(socketWrapper);
             SocketProcessor sc = processorCache.pop();
             if (sc == null) {
                 sc = new SocketProcessor(socketWrapper, status);
@@ -1688,10 +1676,6 @@ public class Nio2Endpoint extends Abstra
                                     socket.getSocket().free();
                                 }
                             }
-                        } else if (state == Handler.SocketState.LONG) {
-                            if (socket.isAsync()) {
-                                waitingRequests.add(socket);
-                            }
                         } else if (state == SocketState.UPGRADING) {
                             socket.setKeptAlive(true);
                             launch = true;

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Oct 20 
12:11:18 2015
@@ -1068,45 +1068,31 @@ public class NioEndpoint extends Abstrac
                         NioSocketWrapper ka = (NioSocketWrapper) 
key.attachment();
                         if ( ka == null ) {
                             cancelledKey(key); //we don't support any keys 
without attachments
+                        } else if (close) {
+                            key.interestOps(0);
+                            ka.interestOps(0); //avoid duplicate stop calls
+                            processKey(key,ka);
                         } else if ((ka.interestOps()&SelectionKey.OP_READ) == 
SelectionKey.OP_READ ||
                                   (ka.interestOps()&SelectionKey.OP_WRITE) == 
SelectionKey.OP_WRITE) {
-                            if (close) {
-                                key.interestOps(0);
-                                ka.interestOps(0); //avoid duplicate stop calls
-                                processKey(key,ka);
-                            } else {
-                                boolean isTimedOut = false;
-                                // Check for read timeout
-                                if ((ka.interestOps() & SelectionKey.OP_READ) 
== SelectionKey.OP_READ) {
-                                    long delta = now - ka.getLastRead();
-                                    long timeout = ka.getReadTimeout();
-                                    isTimedOut = timeout > 0 && delta > 
timeout;
-                                }
-                                // Check for write timeout
-                                if (!isTimedOut && (ka.interestOps() & 
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
-                                    long delta = now - ka.getLastWrite();
-                                    long timeout = ka.getWriteTimeout();
-                                    isTimedOut = timeout > 0 && delta > 
timeout;
-                                }
-                                if (isTimedOut) {
-                                    key.interestOps(0);
-                                    ka.interestOps(0); //avoid duplicate 
timeout calls
-                                    cancelledKey(key);
-                                }
+                            boolean isTimedOut = false;
+                            // Check for read timeout
+                            if ((ka.interestOps() & SelectionKey.OP_READ) == 
SelectionKey.OP_READ) {
+                                long delta = now - ka.getLastRead();
+                                long timeout = ka.getReadTimeout();
+                                isTimedOut = timeout > 0 && delta > timeout;
+                            }
+                            // Check for write timeout
+                            if (!isTimedOut && (ka.interestOps() & 
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
+                                long delta = now - ka.getLastWrite();
+                                long timeout = ka.getWriteTimeout();
+                                isTimedOut = timeout > 0 && delta > timeout;
                             }
-                        } else if (ka.isAsync()) {
-                            if (close) {
+                            if (isTimedOut) {
                                 key.interestOps(0);
-                                ka.interestOps(0); //avoid duplicate stop calls
-                                processKey(key,ka);
-                            } else if (ka.getAsyncTimeout() > 0) {
-                                if ((now - ka.getLastAsyncStart()) > 
ka.getAsyncTimeout()) {
-                                    // Prevent subsequent timeouts if the 
timeout event takes a while to process
-                                    ka.setAsyncTimeout(0);
-                                    processSocket(ka, SocketStatus.TIMEOUT, 
true);
-                                }
+                                ka.interestOps(0); //avoid duplicate timeout 
calls
+                                cancelledKey(key);
                             }
-                        }//end if
+                        }
                     }catch ( CancelledKeyException ckx ) {
                         cancelledKey(key);
                     }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Tue Oct 
20 12:11:18 2015
@@ -38,13 +38,10 @@ public abstract class SocketWrapperBase<
 
     // Volatile because I/O and setting the timeout values occurs on a 
different
     // thread to the thread checking the timeout.
-    private volatile long lastAsyncStart = 0;
-    private volatile long asyncTimeout = -1;
     private volatile long readTimeout = -1;
     private volatile long writeTimeout = -1;
 
     private volatile int keepAliveLeft = 100;
-    private volatile boolean async = false;
     private boolean keptAlive = false;
     private volatile boolean upgraded = false;
     private boolean secure = false;
@@ -111,40 +108,6 @@ public abstract class SocketWrapperBase<
         return endpoint;
     }
 
-    public boolean isAsync() { return async; }
-    /**
-     * Sets the async flag for this connection. If this call causes the
-     * connection to transition from non-async to async then the lastAsyncStart
-     * property will be set using the current time. This property is used as 
the
-     * start time when calculating the async timeout. As per the Servlet spec
-     * the async timeout applies once the dispatch where startAsync() was 
called
-     * has returned to the container (which is when this method is currently
-     * called).
-     *
-     * @param async The new value of for the async flag
-     */
-    public void setAsync(boolean async) {
-        if (!this.async && async) {
-            lastAsyncStart = System.currentTimeMillis();
-        }
-        this.async = async;
-    }
-    /**
-     * Obtain the time that this connection last transitioned to async
-     * processing.
-     *
-     * @return The time (as returned by {@link System#currentTimeMillis()}) 
that
-     *         this connection last transitioned to async
-     */
-    public long getLastAsyncStart() {
-       return lastAsyncStart;
-    }
-    public void setAsyncTimeout(long timeout) {
-        asyncTimeout = timeout;
-    }
-    public long getAsyncTimeout() {
-        return asyncTimeout;
-    }
     public boolean isUpgraded() { return upgraded; }
     public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
     public boolean isSecure() { return secure; }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to