Author: markt Date: Tue Apr 22 12:07:04 2014 New Revision: 1589100 URL: http://svn.apache.org/r1589100 Log: Ensure that threads created to support WebSocket clients are stopped when those clients no longer need them. Note that while this happens automatically for WebSocket client calls made by web applications, stand-along clients must call the Tomcat specific method WsWebSocketContainer.destroy().
Added: tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java (with props) Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Added: tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java?rev=1589100&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java (added) +++ tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java Tue Apr 22 12:07:04 2014 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket; + +import java.io.IOException; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; + +/** + * This is a utility class that enables multiple {@link WsWebSocketContainer} + * instances to share a single {@link AsynchronousChannelGroup} while ensuring + * that the group is destroyed when no longer required. + */ +public class AsyncChannelGroupUtil { + + private static final StringManager sm = + StringManager.getManager(Constants.PACKAGE_NAME); + + private static AsynchronousChannelGroup group = null; + private static int usageCount = 0; + private static final Object lock = new Object(); + + + private AsyncChannelGroupUtil() { + // Hide the default constructor + } + + + public static AsynchronousChannelGroup register() { + synchronized (lock) { + if (usageCount == 0) { + group = createAsynchronousChannelGroup(); + } + usageCount++; + return group; + } + } + + + public static void unregister() { + synchronized (lock) { + usageCount--; + if (usageCount == 0) { + group.shutdown(); + group = null; + } + } + } + + + private static AsynchronousChannelGroup createAsynchronousChannelGroup() { + // Need to do this with the right thread context class loader else the + // first web app to call this will trigger a leak + ClassLoader original = Thread.currentThread().getContextClassLoader(); + + try { + Thread.currentThread().setContextClassLoader( + AsyncIOThreadFactory.class.getClassLoader()); + + // These are the same settings as the default + // AsynchronousChannelGroup + int initialSize = Runtime.getRuntime().availableProcessors(); + ExecutorService executorService = new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + Long.MAX_VALUE, TimeUnit.MILLISECONDS, + new SynchronousQueue<Runnable>(), + new AsyncIOThreadFactory()); + + try { + return AsynchronousChannelGroup.withCachedThreadPool( + executorService, initialSize); + } catch (IOException e) { + // No good reason for this to happen. + throw new IllegalStateException(sm.getString("asyncChannelGroup.createFail")); + } + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + + private static class AsyncIOThreadFactory implements ThreadFactory { + + private AtomicInteger count = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet()); + t.setContextClassLoader(this.getClass().getClassLoader()); + t.setDaemon(true); + return t; + } + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1589100&r1=1589099&r2=1589100&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Tue Apr 22 12:07:04 2014 @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +asyncChannelGroup.createFail=Unable to create dedicated AsynchronousChannelGroup for WebSocket clients which is required to prevent memory leaks in complex class loader environments like J2EE containers + asyncChannelWrapperSecure.closeFail=Failed to close channel cleanly asyncChannelWrapperSecure.concurrentRead=Concurrent read operations are not permitted asyncChannelWrapperSecure.concurrentWrite=Concurrent write operations are not permitted @@ -86,7 +88,6 @@ wsSession.unknownHandlerType=Unable to a # as many as 4 bytes. wsWebSocketContainer.shutdown=The web application is stopping -wsWebSocketContainer.asynchronousChannelGroupFail=Unable to create dedicated AsynchronousChannelGroup for WebSocket clients which is required to prevent memory leaks in complex class loader environments like J2EE containers wsWebSocketContainer.asynchronousSocketChannelFail=Unable to open a connection to the server wsWebSocketContainer.defaultConfiguratorFaill=Failed to create the default configurator wsWebSocketContainer.endpointCreateFail=Failed to create a local endpoint of type [{0}] Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1589100&r1=1589099&r2=1589100&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Tue Apr 22 12:07:04 2014 @@ -42,13 +42,9 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -69,7 +65,6 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.codec.binary.Base64; import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.threads.ThreadPoolExecutor; import org.apache.tomcat.websocket.pojo.PojoEndpointClient; public class WsWebSocketContainer @@ -109,43 +104,9 @@ public class WsWebSocketContainer StringManager.getManager(Constants.PACKAGE_NAME); private static final Random random = new Random(); private static final byte[] crlf = new byte[] {13, 10}; - private static final AsynchronousChannelGroup asynchronousChannelGroup; - static { - AsynchronousChannelGroup result = null; - - // Need to do this with the right thread context class loader else the - // first web app to call this will trigger a leak - ClassLoader original = Thread.currentThread().getContextClassLoader(); - - try { - Thread.currentThread().setContextClassLoader( - AsyncIOThreadFactory.class.getClassLoader()); - - // These are the same settings as the default - // AsynchronousChannelGroup - int initialSize = Runtime.getRuntime().availableProcessors(); - ExecutorService executorService = new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - Long.MAX_VALUE, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), - new AsyncIOThreadFactory()); - - try { - result = AsynchronousChannelGroup.withCachedThreadPool( - executorService, initialSize); - } catch (IOException e) { - // No good reason for this to happen. - throw new IllegalStateException(sm.getString( - "wsWebSocketContainer.asynchronousChannelGroupFail")); - } - } finally { - Thread.currentThread().setContextClassLoader(original); - } - - asynchronousChannelGroup = result; - } + private AsynchronousChannelGroup asynchronousChannelGroup = null; + private final Object asynchronousChannelGroupLock = new Object(); private final Log log = LogFactory.getLog(WsWebSocketContainer.class); private final Map<Class<?>, Set<WsSession>> endpointSessionMap = @@ -285,8 +246,7 @@ public class WsWebSocketContainer AsynchronousSocketChannel socketChannel; try { - socketChannel = - AsynchronousSocketChannel.open(asynchronousChannelGroup); + socketChannel = AsynchronousSocketChannel.open(getAsynchronousChannelGroup()); } catch (IOException ioe) { throw new DeploymentException(sm.getString( "wsWebSocketContainer.asynchronousSocketChannelFail"), ioe); @@ -798,6 +758,33 @@ public class WsWebSocketContainer "wsWebSocketContainer.sessionCloseFail", session.getId()), ioe); } } + + // Only unregister with AsyncChannelGroupUtil if this instance + // registered with it + if (asynchronousChannelGroup != null) { + synchronized (asynchronousChannelGroupLock) { + if (asynchronousChannelGroup != null) { + AsyncChannelGroupUtil.unregister(); + asynchronousChannelGroup = null; + } + } + } + } + + + private AsynchronousChannelGroup getAsynchronousChannelGroup() { + // Use AsyncChannelGroupUtil to share a common group amongst all + // WebSocket clients + AsynchronousChannelGroup result = asynchronousChannelGroup; + if (result == null) { + synchronized (asynchronousChannelGroupLock) { + if (asynchronousChannelGroup == null) { + asynchronousChannelGroup = AsyncChannelGroupUtil.register(); + } + result = asynchronousChannelGroup; + } + } + return result; } @@ -835,23 +822,4 @@ public class WsWebSocketContainer public int getProcessPeriod() { return processPeriod; } - - - /** - * Create threads for AsyncIO that have the right context class loader to - * prevent memory leaks. - */ - private static class AsyncIOThreadFactory implements ThreadFactory { - - private AtomicInteger count = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet()); - t.setContextClassLoader(this.getClass().getClassLoader()); - t.setDaemon(true); - return t; - } - } } Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1589100&r1=1589099&r2=1589100&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original) +++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Tue Apr 22 12:07:04 2014 @@ -110,6 +110,8 @@ public class TestWsWebSocketContainer ex Queue<String> messages = handler.getMessages(); Assert.assertEquals(1, messages.size()); Assert.assertEquals(MESSAGE_STRING_1, messages.peek()); + + ((WsWebSocketContainer) wsContainer).destroy(); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org