Author: markt
Date: Tue Oct 20 12:11:18 2015
New Revision: 1709578
URL: http://svn.apache.org/viewvc?rev=1709578&view=rev
Log:
Refactor async timeouts
Move async timeout thread to Protocol
Move tracking of timeout setting to Processor
Move tracking of last asycn start to AsyncStateMachine
Modified:
tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
tomcat/trunk/java/org/apache/coyote/Processor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Tue Oct 20
12:11:18 2015
@@ -40,6 +40,7 @@ public abstract class AbstractProcessor
protected Adapter adapter;
protected final AsyncStateMachine asyncStateMachine;
+ private volatile long asyncTimeout = -1;
protected final AbstractEndpoint<?> endpoint;
protected final Request request;
protected final Response response;
@@ -242,6 +243,39 @@ public abstract class AbstractProcessor
}
+ @Override
+ public void timeoutAsync(long now) {
+ if (now < 0) {
+ doTimeoutAsync();
+ } else {
+ long asyncTimeout = getAsyncTimeout();
+ if (asyncTimeout > 0) {
+ long asyncStart = asyncStateMachine.getLastAsyncStart();
+ if ((now - asyncStart) > asyncTimeout) {
+ doTimeoutAsync();
+ }
+ }
+ }
+ }
+
+
+ private void doTimeoutAsync() {
+ // Avoid multiple timeouts
+ setAsyncTimeout(-1);
+ socketWrapper.processSocket(SocketStatus.TIMEOUT, true);
+ }
+
+
+ public void setAsyncTimeout(long timeout) {
+ asyncTimeout = timeout;
+ }
+
+
+ public long getAsyncTimeout() {
+ return asyncTimeout;
+ }
+
+
@Override
public void recycle() {
errorState = ErrorState.NONE;
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Tue Oct 20
12:11:18 2015
@@ -19,6 +19,8 @@ package org.apache.coyote;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -86,9 +88,20 @@ public abstract class AbstractProtocol<S
*/
private final AbstractEndpoint<S> endpoint;
+
private Handler<S> handler;
+ private final Set<Processor> waitingProcessors =
+ Collections.newSetFromMap(new ConcurrentHashMap<Processor,
Boolean>());
+
+
+ /**
+ * The async timeout thread.
+ */
+ private AsyncTimeout asyncTimeout = null;
+
+
public AbstractProtocol(AbstractEndpoint<S> endpoint) {
this.endpoint = endpoint;
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
@@ -182,6 +195,11 @@ public abstract class AbstractProtocol<S
}
+ public AsyncTimeout getAsyncTimeout() {
+ return asyncTimeout;
+ }
+
+
// ---------------------- Properties that are passed through to the
EndPoint
@Override
@@ -338,6 +356,16 @@ public abstract class AbstractProtocol<S
}
+ public void addWaitingProcessor(Processor processor) {
+ waitingProcessors.add(processor);
+ }
+
+
+ public void removeWaitingProcessor(Processor processor) {
+ waitingProcessors.remove(processor);
+ }
+
+
// ----------------------------------------------- Accessors for
sub-classes
protected AbstractEndpoint<S> getEndpoint() {
@@ -514,6 +542,14 @@ public abstract class AbstractProtocol<S
getName()), ex);
throw ex;
}
+
+
+ // Start async timeout thread
+ asyncTimeout = new AsyncTimeout();
+ Thread timeoutThread = new Thread(asyncTimeout, getName() +
"-AsyncTimeout");
+ timeoutThread.setPriority(endpoint.getThreadPriority());
+ timeoutThread.setDaemon(true);
+ timeoutThread.start();
}
@@ -551,6 +587,9 @@ public abstract class AbstractProtocol<S
if(getLog().isInfoEnabled())
getLog().info(sm.getString("abstractProtocolHandler.stop",
getName()));
+
+ asyncTimeout.stop();
+
try {
endpoint.stop();
} catch (Exception ex) {
@@ -648,7 +687,6 @@ public abstract class AbstractProtocol<S
return SocketState.CLOSED;
}
- wrapper.setAsync(false);
ContainerThreadMarker.set();
try {
@@ -684,6 +722,8 @@ public abstract class AbstractProtocol<S
// Associate the processor with the connection
connections.put(socket, processor);
+ // Make sure an async timeout doesn't fire
+ getProtocol().removeWaitingProcessor(processor);
SocketState state = SocketState.CLOSED;
do {
@@ -719,6 +759,9 @@ public abstract class AbstractProtocol<S
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
+ if (processor.isAsync()) {
+ getProtocol().addWaitingProcessor(processor);
+ }
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
@@ -791,11 +834,8 @@ public abstract class AbstractProtocol<S
protected void longPoll(SocketWrapperBase<?> socket, Processor
processor) {
- if (processor.isAsync()) {
- // Async
- socket.setAsync(true);
- } else {
- // This branch is currently only used with HTTP
+ if (!processor.isAsync()) {
+ // This is currently only used with HTTP
// Either:
// - this is an upgraded connection
// - the request line/headers have not been completely
@@ -964,4 +1004,53 @@ public abstract class AbstractProtocol<S
size.set(0);
}
}
+
+
+ /**
+ * Async timeout thread
+ */
+ protected class AsyncTimeout implements Runnable {
+
+ private volatile boolean asyncTimeoutRunning = true;
+
+ /**
+ * The background thread that checks async requests and fires the
+ * timeout if there has been no activity.
+ */
+ @Override
+ public void run() {
+
+ // Loop until we receive a shutdown command
+ while (asyncTimeoutRunning) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ long now = System.currentTimeMillis();
+ for (Processor processor : waitingProcessors) {
+ processor.timeoutAsync(now);
+ }
+
+ // Loop if endpoint is paused
+ while (endpoint.isPaused() && asyncTimeoutRunning) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+
+ protected void stop() {
+ asyncTimeoutRunning = false;
+
+ // Timeout any pending async request
+ for (Processor processor : waitingProcessors) {
+ processor.timeoutAsync(-1);
+ }
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java [UTF-8]
(original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java [UTF-8] Tue Oct
20 12:11:18 2015
@@ -154,6 +154,7 @@ public class AsyncStateMachine {
private volatile AsyncState state = AsyncState.DISPATCHED;
+ private volatile long lastAsyncStart = 0;
// Need this to fire listener on complete
private AsyncContextCallback asyncCtxt = null;
private final AbstractProcessor processor;
@@ -188,10 +189,22 @@ public class AsyncStateMachine {
return state.isCompleting();
}
+ /**
+ * Obtain the time that this connection last transitioned to async
+ * processing.
+ *
+ * @return The time (as returned by {@link System#currentTimeMillis()})
that
+ * this connection last transitioned to async
+ */
+ public long getLastAsyncStart() {
+ return lastAsyncStart;
+ }
+
public synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
if (state == AsyncState.DISPATCHED) {
state = AsyncState.STARTING;
this.asyncCtxt = asyncCtxt;
+ lastAsyncStart = System.currentTimeMillis();
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState",
Modified: tomcat/trunk/java/org/apache/coyote/Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Processor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Processor.java Tue Oct 20 12:11:18 2015
@@ -49,8 +49,19 @@ public interface Processor {
HttpUpgradeHandler getHttpUpgradeHandler();
- boolean isAsync();
boolean isUpgrade();
+ boolean isAsync();
+
+ /**
+ * Check this processor to see if the async timeout has expired and process
+ * a timeout if that is that case.
+ *
+ * @param now The time (as returned by {@link System#currentTimeMillis()}
to
+ * use as the current time to determine whether the async
timeout
+ * has expired. If negative, the timeout will always be treated
+ * as if it has expired.
+ */
+ void timeoutAsync(long now);
Request getRequest();
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Oct 20
12:11:18 2015
@@ -517,7 +517,7 @@ public class AjpProcessor extends Abstra
case ASYNC_SETTIMEOUT: {
if (param == null) return;
long timeout = ((Long)param).longValue();
- socketWrapper.setAsyncTimeout(timeout);
+ setAsyncTimeout(timeout);
break;
}
case ASYNC_TIMEOUT: {
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Tue Oct 20
12:11:18 2015
@@ -742,7 +742,7 @@ public class Http11Processor extends Abs
return;
}
long timeout = ((Long) param).longValue();
- socketWrapper.setAsyncTimeout(timeout);
+ setAsyncTimeout(timeout);
break;
}
case ASYNC_DISPATCH: {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
Tue Oct 20 12:11:18 2015
@@ -95,4 +95,10 @@ public abstract class UpgradeProcessorBa
public ByteBuffer getLeftoverInput() {
return null;
}
+
+
+ @Override
+ public void timeoutAsync(long now) {
+ // NO-OP
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Tue Oct
20 12:11:18 2015
@@ -20,11 +20,9 @@ import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -134,62 +132,8 @@ public abstract class AbstractEndpoint<S
private static final int MAX_ERROR_DELAY = 1600;
- /**
- * Async timeout thread
- */
- protected class AsyncTimeout implements Runnable {
-
- private volatile boolean asyncTimeoutRunning = true;
-
- /**
- * The background thread that checks async requests and fires the
- * timeout if there has been no activity.
- */
- @Override
- public void run() {
-
- // Loop until we receive a shutdown command
- while (asyncTimeoutRunning) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- long now = System.currentTimeMillis();
- for (SocketWrapperBase<S> socket : waitingRequests) {
- long asyncTimeout = socket.getAsyncTimeout();
- if (asyncTimeout > 0) {
- long asyncStart = socket.getLastAsyncStart();
- if ((now - asyncStart) > asyncTimeout) {
- // Avoid multiple timeouts
- socket.setAsyncTimeout(-1);
- processSocket(socket, SocketStatus.TIMEOUT, true);
- }
- }
- }
-
- // Loop if endpoint is paused
- while (paused && asyncTimeoutRunning) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- }
- }
-
-
- protected void stop() {
- asyncTimeoutRunning = false;
- }
- }
-
-
// ----------------------------------------------------------------- Fields
-
/**
* Running state of the endpoint.
*/
@@ -962,25 +906,6 @@ public abstract class AbstractEndpoint<S
} else {
return MAX_ERROR_DELAY;
}
-
- }
-
-
- protected final Set<SocketWrapperBase<S>> waitingRequests = Collections
- .newSetFromMap(new ConcurrentHashMap<SocketWrapperBase<S>,
Boolean>());
- public void removeWaitingRequest(SocketWrapperBase<S> socketWrapper) {
- waitingRequests.remove(socketWrapper);
- }
-
- /**
- * The async timeout thread.
- */
- private AsyncTimeout asyncTimeout = null;
- public AsyncTimeout getAsyncTimeout() {
- return asyncTimeout;
- }
- public void setAsyncTimeout(AsyncTimeout asyncTimeout) {
- this.asyncTimeout = asyncTimeout;
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Tue Oct 20
12:11:18 2015
@@ -598,13 +598,6 @@ public class AprEndpoint extends Abstrac
}
startAcceptorThreads();
-
- // Start async timeout thread
- setAsyncTimeout(new AsyncTimeout());
- Thread timeoutThread = new Thread(getAsyncTimeout(), getName() +
"-AsyncTimeout");
- timeoutThread.setPriority(threadPriority);
- timeoutThread.setDaemon(true);
- timeoutThread.start();
}
}
@@ -629,7 +622,6 @@ public class AprEndpoint extends Abstrac
// Ignore
}
}
- getAsyncTimeout().stop();
for (AbstractEndpoint.Acceptor acceptor : acceptors) {
long waitLeft = 10000;
while (waitLeft > 0 &&
@@ -874,7 +866,6 @@ public class AprEndpoint extends Abstrac
// result of calling AsyncContext.dispatch() from a non-container
// thread
synchronized (socket) {
- waitingRequests.remove(socket);
SocketProcessor proc = new SocketProcessor(socket, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
@@ -1724,45 +1715,38 @@ public class AprEndpoint extends Abstrac
if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
|| ((desc[n*2] & Poll.APR_POLLNVAL) ==
Poll.APR_POLLNVAL)) {
- if (wrapper.isAsync() ||
wrapper.isUpgraded()) {
- // Must be using non-blocking IO for
the socket to be in the
- // poller during async processing.
Need to trigger error
- // handling. Poller may return error
codes plus the flags it
- // was waiting for or it may just
return an error code. We
- // could return
ASYNC_[WRITE|READ]_ERROR here but if we do,
- // there will be no exception
associated with the error in
- // application code. By signalling
read/write is possible, a
- // read/write will be attempted, fail
and that will trigger
- // an exception the application will
see.
- // Check the return flags first,
followed by what the socket
- // was registered for
- if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) {
- // Error probably occurred during
a non-blocking read
- if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((desc[n*2] &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
- // Error probably occurred during
a non-blocking write
- if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((wrapper.pollerFlags &
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
- // Can't tell what was happening
when the error occurred but the
- // socket is registered for
non-blocking read so use that
- if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else if ((wrapper.pollerFlags &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
- // Can't tell what was happening
when the error occurred but the
- // socket is registered for
non-blocking write so use that
- if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
- // Close socket and clear pool
- closeSocket(desc[n*2+1]);
- }
- } else {
+ // Need to trigger error handling. Poller
may return error
+ // codes plus the flags it was waiting for
or it may just
+ // return an error code. We could handle
the error here but
+ // if we do, there will be no exception
associated with the
+ // error in application code. By
signalling read/write is
+ // possible, a read/write will be
attempted, fail and that
+ // will trigger an exception the
application will see.
+ // Check the return flags first, followed
by what the socket
+ // was registered for
+ if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) {
+ // Error probably occurred during a
non-blocking read
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
+ }
+ } else if ((desc[n*2] & Poll.APR_POLLOUT)
== Poll.APR_POLLOUT) {
+ // Error probably occurred during a
non-blocking write
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
+ }
+ } else if ((wrapper.pollerFlags &
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+ // Can't tell what was happening when
the error occurred but the
+ // socket is registered for
non-blocking read so use that
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ // Close socket and clear pool
+ closeSocket(desc[n*2+1]);
+ }
+ } else if ((wrapper.pollerFlags &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ // Can't tell what was happening when
the error occurred but the
+ // socket is registered for
non-blocking write so use that
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
// Close socket and clear pool
closeSocket(desc[n*2+1]);
}
@@ -2272,10 +2256,6 @@ public class AprEndpoint extends Abstrac
// Close socket and pool
closeSocket(socket.getSocket().longValue());
socket = null;
- } else if (state == Handler.SocketState.LONG) {
- if (socket.isAsync()) {
- waitingRequests.add(socket);
- }
}
}
}
@@ -2317,10 +2297,6 @@ public class AprEndpoint extends Abstrac
if (state == Handler.SocketState.CLOSED) {
// Close socket and pool
closeSocket(socket.getSocket().longValue());
- } else if (state == Handler.SocketState.LONG) {
- if (socket.isAsync()) {
- waitingRequests.add(socket);
- }
} else if (state == Handler.SocketState.ASYNC_END) {
SocketProcessor proc = new SocketProcessor(socket,
SocketStatus.OPEN_READ);
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Oct 20
12:11:18 2015
@@ -230,12 +230,6 @@ public class Nio2Endpoint extends Abstra
initializeConnectionLatch();
startAcceptorThreads();
-
- setAsyncTimeout(new AsyncTimeout());
- Thread timeoutThread = new Thread(getAsyncTimeout(), getName() +
"-AsyncTimeout");
- timeoutThread.setPriority(threadPriority);
- timeoutThread.setDaemon(true);
- timeoutThread.start();
}
}
@@ -251,17 +245,12 @@ public class Nio2Endpoint extends Abstra
}
if (running) {
running = false;
- getAsyncTimeout().stop();
unlockAccept();
// Use the executor to avoid binding the main thread if something
bad
// occurs and unbind will also wait for a bit for it to complete
getExecutor().execute(new Runnable() {
@Override
public void run() {
- // Timeout any pending async request
- for (SocketWrapperBase<Nio2Channel> socket :
waitingRequests) {
- processSocket(socket, SocketStatus.TIMEOUT, false);
- }
// Then close all active connections if any remain
try {
handler.closeAll();
@@ -393,7 +382,6 @@ public class Nio2Endpoint extends Abstra
protected boolean processSocket0(SocketWrapperBase<Nio2Channel>
socketWrapper, SocketStatus status, boolean dispatch) {
try {
- waitingRequests.remove(socketWrapper);
SocketProcessor sc = processorCache.pop();
if (sc == null) {
sc = new SocketProcessor(socketWrapper, status);
@@ -1688,10 +1676,6 @@ public class Nio2Endpoint extends Abstra
socket.getSocket().free();
}
}
- } else if (state == Handler.SocketState.LONG) {
- if (socket.isAsync()) {
- waitingRequests.add(socket);
- }
} else if (state == SocketState.UPGRADING) {
socket.setKeptAlive(true);
launch = true;
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Oct 20
12:11:18 2015
@@ -1068,45 +1068,31 @@ public class NioEndpoint extends Abstrac
NioSocketWrapper ka = (NioSocketWrapper)
key.attachment();
if ( ka == null ) {
cancelledKey(key); //we don't support any keys
without attachments
+ } else if (close) {
+ key.interestOps(0);
+ ka.interestOps(0); //avoid duplicate stop calls
+ processKey(key,ka);
} else if ((ka.interestOps()&SelectionKey.OP_READ) ==
SelectionKey.OP_READ ||
(ka.interestOps()&SelectionKey.OP_WRITE) ==
SelectionKey.OP_WRITE) {
- if (close) {
- key.interestOps(0);
- ka.interestOps(0); //avoid duplicate stop calls
- processKey(key,ka);
- } else {
- boolean isTimedOut = false;
- // Check for read timeout
- if ((ka.interestOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
- long delta = now - ka.getLastRead();
- long timeout = ka.getReadTimeout();
- isTimedOut = timeout > 0 && delta >
timeout;
- }
- // Check for write timeout
- if (!isTimedOut && (ka.interestOps() &
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
- long delta = now - ka.getLastWrite();
- long timeout = ka.getWriteTimeout();
- isTimedOut = timeout > 0 && delta >
timeout;
- }
- if (isTimedOut) {
- key.interestOps(0);
- ka.interestOps(0); //avoid duplicate
timeout calls
- cancelledKey(key);
- }
+ boolean isTimedOut = false;
+ // Check for read timeout
+ if ((ka.interestOps() & SelectionKey.OP_READ) ==
SelectionKey.OP_READ) {
+ long delta = now - ka.getLastRead();
+ long timeout = ka.getReadTimeout();
+ isTimedOut = timeout > 0 && delta > timeout;
+ }
+ // Check for write timeout
+ if (!isTimedOut && (ka.interestOps() &
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
+ long delta = now - ka.getLastWrite();
+ long timeout = ka.getWriteTimeout();
+ isTimedOut = timeout > 0 && delta > timeout;
}
- } else if (ka.isAsync()) {
- if (close) {
+ if (isTimedOut) {
key.interestOps(0);
- ka.interestOps(0); //avoid duplicate stop calls
- processKey(key,ka);
- } else if (ka.getAsyncTimeout() > 0) {
- if ((now - ka.getLastAsyncStart()) >
ka.getAsyncTimeout()) {
- // Prevent subsequent timeouts if the
timeout event takes a while to process
- ka.setAsyncTimeout(0);
- processSocket(ka, SocketStatus.TIMEOUT,
true);
- }
+ ka.interestOps(0); //avoid duplicate timeout
calls
+ cancelledKey(key);
}
- }//end if
+ }
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1709578&r1=1709577&r2=1709578&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Tue Oct
20 12:11:18 2015
@@ -38,13 +38,10 @@ public abstract class SocketWrapperBase<
// Volatile because I/O and setting the timeout values occurs on a
different
// thread to the thread checking the timeout.
- private volatile long lastAsyncStart = 0;
- private volatile long asyncTimeout = -1;
private volatile long readTimeout = -1;
private volatile long writeTimeout = -1;
private volatile int keepAliveLeft = 100;
- private volatile boolean async = false;
private boolean keptAlive = false;
private volatile boolean upgraded = false;
private boolean secure = false;
@@ -111,40 +108,6 @@ public abstract class SocketWrapperBase<
return endpoint;
}
- public boolean isAsync() { return async; }
- /**
- * Sets the async flag for this connection. If this call causes the
- * connection to transition from non-async to async then the lastAsyncStart
- * property will be set using the current time. This property is used as
the
- * start time when calculating the async timeout. As per the Servlet spec
- * the async timeout applies once the dispatch where startAsync() was
called
- * has returned to the container (which is when this method is currently
- * called).
- *
- * @param async The new value of for the async flag
- */
- public void setAsync(boolean async) {
- if (!this.async && async) {
- lastAsyncStart = System.currentTimeMillis();
- }
- this.async = async;
- }
- /**
- * Obtain the time that this connection last transitioned to async
- * processing.
- *
- * @return The time (as returned by {@link System#currentTimeMillis()})
that
- * this connection last transitioned to async
- */
- public long getLastAsyncStart() {
- return lastAsyncStart;
- }
- public void setAsyncTimeout(long timeout) {
- asyncTimeout = timeout;
- }
- public long getAsyncTimeout() {
- return asyncTimeout;
- }
public boolean isUpgraded() { return upgraded; }
public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
public boolean isSecure() { return secure; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]