Author: markt
Date: Wed Oct 30 11:20:08 2013
New Revision: 1537041
URL: http://svn.apache.org/r1537041
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55715
Add a per web application executor to the WebSocket implementation and use it
for calling SendHandler.onResult() when there is a chance that the current
thread also initiated the write
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
tomcat/trunk/webapps/docs/web-socket-howto.xml
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java Wed Oct
30 11:20:08 2013
@@ -31,6 +31,14 @@ public class Constants {
public static final String
ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM =
"org.apache.tomcat.websocket.noAddAfterHandshake";
+ // Executor configuration
+ public static final String EXECUTOR_CORE_SIZE_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorCoreSize";
+ public static final String EXECUTOR_MAX_SIZE_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorMaxSize";
+ public static final String EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorKeepAliveTimeSeconds";
+
public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE =
"javax.websocket.server.ServerContainer";
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
Wed Oct 30 11:20:08 2013
@@ -45,6 +45,7 @@ public class WsContextListener implement
ServletContext sc = sce.getServletContext();
Object obj =
sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
if (obj instanceof WsServerContainer) {
+ ((WsServerContainer) obj).shutdownExecutor();
((WsServerContainer) obj).destroy();
}
}
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
(original)
+++
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
Wed Oct 30 11:20:08 2013
@@ -233,7 +233,9 @@ public class WsHttpUpgradeHandler implem
@Override
public void onWritePossible() {
- wsRemoteEndpointServer.onWritePossible();
+ // Triggered by the poller so this isn't the same thread that
+ // triggered the write so no need for a dispatch
+ wsRemoteEndpointServer.onWritePossible(false);
}
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
(original)
+++
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
Wed Oct 30 11:20:08 2013
@@ -20,6 +20,9 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import javax.servlet.ServletOutputStream;
import javax.websocket.SendHandler;
@@ -42,8 +45,12 @@ public class WsRemoteEndpointImplServer
private static final Log log =
LogFactory.getLog(WsHttpUpgradeHandler.class);
+ private static final Queue<OnResultRunnable> onResultRunnables =
+ new ConcurrentLinkedQueue<>();
+
private final ServletOutputStream sos;
private final WsWriteTimeout wsWriteTimeout;
+ private final ExecutorService executorService;
private volatile SendHandler handler = null;
private volatile ByteBuffer[] buffers = null;
@@ -55,6 +62,7 @@ public class WsRemoteEndpointImplServer
WsServerContainer serverContainer) {
this.sos = sos;
this.wsWriteTimeout = serverContainer.getTimeout();
+ this.executorService = serverContainer.getExecutorService();
}
@@ -68,11 +76,13 @@ public class WsRemoteEndpointImplServer
protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
this.handler = handler;
this.buffers = buffers;
- onWritePossible();
+ // This is definitely the same thread that triggered the write so a
+ // dispatch will be required.
+ onWritePossible(true);
}
- public void onWritePossible() {
+ public void onWritePossible(boolean useDispatch) {
boolean complete = true;
try {
// If this is false there will be a call back when it is true
@@ -89,7 +99,7 @@ public class WsRemoteEndpointImplServer
}
if (complete) {
wsWriteTimeout.unregister(this);
- clearHandler(null);
+ clearHandler(null, useDispatch);
if (close) {
close();
}
@@ -99,7 +109,7 @@ public class WsRemoteEndpointImplServer
} catch (IOException ioe) {
wsWriteTimeout.unregister(this);
- clearHandler(ioe);
+ clearHandler(ioe, useDispatch);
close();
}
if (!complete) {
@@ -118,7 +128,11 @@ public class WsRemoteEndpointImplServer
@Override
protected void doClose() {
if (handler != null) {
- clearHandler(new EOFException());
+ // close() can be triggered by a wide range of scenarios. It is far
+ // simpler just to always use a dispatch that it is to try and
track
+ // whether or not this method was called by the same thread that
+ // triggered the write
+ clearHandler(new EOFException(), true);
}
try {
sos.close();
@@ -136,15 +150,31 @@ public class WsRemoteEndpointImplServer
}
- protected void onTimeout() {
+ /*
+ * Currently this is only called from the background thread so we could
just
+ * call clearHandler() with useDispatch == false but the method parameter
+ * was added in case other callers started to use this method to make sure
+ * that those callers think through what the correct value of useDispatch
is
+ * for them.
+ */
+ protected void onTimeout(boolean useDispatch) {
if (handler != null) {
- clearHandler(new SocketTimeoutException());
+ clearHandler(new SocketTimeoutException(), useDispatch);
}
close();
}
- private void clearHandler(Throwable t) {
+ /**
+ *
+ * @param t The throwable associated with any error that
+ * occurred
+ * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be
+ * called from a new thread, keeping in mind the
+ * requirements of
+ * {@link javax.websocket.RemoteEndpoint.Async}
+ */
+ private void clearHandler(Throwable t, boolean useDispatch) {
// Setting the result marks this (partial) message as
// complete which means the next one may be sent which
// could update the value of the handler. Therefore, keep a
@@ -153,11 +183,64 @@ public class WsRemoteEndpointImplServer
SendHandler sh = handler;
handler = null;
if (sh != null) {
+ if (useDispatch) {
+ OnResultRunnable r = onResultRunnables.poll();
+ if (r == null) {
+ r = new OnResultRunnable(onResultRunnables);
+ }
+ r.init(sh, t);
+ if (executorService == null || executorService.isShutdown()) {
+ // Can't use the executor so call the runnable directly.
+ // This may not be strictly specification compliant in all
+ // cases but during shutdown only close messages are going
+ // to be sent so there should not be the issue of nested
+ // calls leading to stack overflow as described in bug
+ // 55715. The issues with nested calls was the reason for
+ // the separate thread requirement in the specification.
+ r.run();
+ } else {
+ executorService.execute(r);
+ }
+ } else {
+ if (t == null) {
+ sh.onResult(new SendResult());
+ } else {
+ sh.onResult(new SendResult(t));
+ }
+ }
+ }
+ }
+
+
+ private static class OnResultRunnable implements Runnable {
+
+ private final Queue<OnResultRunnable> queue;
+
+ private volatile SendHandler sh;
+ private volatile Throwable t;
+
+ private OnResultRunnable(Queue<OnResultRunnable> queue) {
+ this.queue = queue;
+ }
+
+ private void init(SendHandler sh, Throwable t) {
+ this.sh = sh;
+ this.t = t;
+ }
+
+ @Override
+ public void run() {
if (t == null) {
sh.onResult(new SendResult());
} else {
sh.onResult(new SendResult(t));
}
+ t = null;
+ sh = null;
+ // Return the Runnable to the queue when it has been finished with
+ // Note if this method takes an age to finish there shouldn't be
any
+ // thread safety issues as the fields are cleared above.
+ queue.add(this);
}
}
}
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
Wed Oct 30 11:20:08 2013
@@ -26,6 +26,12 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
@@ -81,6 +87,7 @@ public class WsServerContainer extends W
private volatile boolean addAllowed = true;
private final ConcurrentHashMap<String,Set<WsSession>>
authenticatedSessions =
new ConcurrentHashMap<>();
+ private final ExecutorService executorService;
WsServerContainer(ServletContext servletContext) {
@@ -104,6 +111,25 @@ public class WsServerContainer extends W
if (value != null) {
setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
}
+ // Executor config
+ int executorCoreSize = 0;
+ int executorMaxSize = 10;
+ long executorKeepAliveTimeSeconds = 60;
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorCoreSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorMaxSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
+ if (value != null) {
+ executorKeepAliveTimeSeconds = Long.parseLong(value);
+ }
FilterRegistration.Dynamic fr = servletContext.addFilter(
WsFilter.class.getName(), new WsFilter());
@@ -113,6 +139,24 @@ public class WsServerContainer extends W
DispatcherType.FORWARD);
fr.addMappingForUrlPatterns(types, true, "/*");
+
+ // Use a per web application executor for any threads the the WebSocket
+ // server code needs to create. Group all of the threads under a single
+ // ThreadGroup.
+ StringBuffer threadGroupName = new StringBuffer("WebSocketServer-");
+ threadGroupName.append(servletContext.getVirtualServerName());
+ threadGroupName.append('-');
+ if ("".equals(servletContext.getContextPath())) {
+ threadGroupName.append("ROOT");
+ } else {
+ threadGroupName.append(servletContext.getContextPath());
+ }
+ ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+ WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
+
+ executorService = new ThreadPoolExecutor(executorCoreSize,
+ executorMaxSize, executorKeepAliveTimeSeconds,
TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
}
@@ -378,6 +422,21 @@ public class WsServerContainer extends W
}
}
+
+ ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+
+ void shutdownExecutor() {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore the interruption and carry on
+ }
+ }
+
private static void validateEncoders(Class<? extends Encoder>[] encoders)
throws DeploymentException {
@@ -395,6 +454,7 @@ public class WsServerContainer extends W
}
}
+
private static class TemplatePathMatch {
private final ServerEndpointConfig config;
private final UriTemplate uriTemplate;
@@ -441,4 +501,22 @@ public class WsServerContainer extends W
tpm2.getUriTemplate().getNormalizedPath());
}
}
+
+
+ private static class WsThreadFactory implements ThreadFactory {
+
+ private final ThreadGroup tg;
+ private final AtomicLong count = new AtomicLong(0);
+
+ private WsThreadFactory(ThreadGroup tg) {
+ this.tg = tg;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r);
+ t.setName(tg.getName() + "-" + count.incrementAndGet());
+ return t;
+ }
+ }
}
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
Wed Oct 30 11:20:08 2013
@@ -52,7 +52,9 @@ public class WsWriteTimeout implements B
while (iter.hasNext()) {
WsRemoteEndpointImplServer endpoint = iter.next();
if (endpoint.getTimeoutExpiry() < now) {
- endpoint.onTimeout();
+ // Background thread, not the thread that triggered the
+ // write so no need to use a dispatch
+ endpoint.onTimeout(false);
} else {
// Endpoints are ordered by timeout expiry so if this point
// is reached there is no need to check the remaining
Modified: tomcat/trunk/webapps/docs/web-socket-howto.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/web-socket-howto.xml?rev=1537041&r1=1537040&r2=1537041&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/web-socket-howto.xml (original)
+++ tomcat/trunk/webapps/docs/web-socket-howto.xml Wed Oct 30 11:20:08 2013
@@ -82,6 +82,25 @@
property to <code>true</code> but any explicit setting on the servlet
context
will always take priority.</p>
+<p>The Java WebSocket 1.0 specification requires that call backs for
+ asynchronous writes are performed on a different thread to the thread that
+ initiated the write. Since the container thread pool is not exposed via the
+ Servlet API, the WebSocket implementation has to provide its own thread
pool.
+ This thread pool is controlled by the following servlet context
+ initialization parameters:<a>
+ <ul>
+ <li><code>org.apache.tomcat.websocket.executorCoreSize</code>: The core
+ size of the executor thread pool. If not set, the default of 0 (zero)
+ is used.</li>
+ <li><code>org.apache.tomcat.websocket.executorMaxSize</code>: The maximum
+ permitted size of the executor thread pool. If not set, the default of
+ 10 is used.</li>
+ <li><code>org.apache.tomcat.websocket.executorKeepAliveTimeSeconds</code>:
+ The maximum time an idle thread will remain in the executor thread
pool
+ until it is terminated. If not specified, the default of 60 seconds is
+ used.</li>
+ </ul>
+
<p>When using the WebSocket client to connect to server endpoints, the timeout
for IO operations while establishing the connection is controlled by the
<code>userProperties</code> of the provided
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]