Author: markt
Date: Tue Apr 22 12:07:04 2014
New Revision: 1589100

URL: http://svn.apache.org/r1589100
Log:
Ensure that threads created to support WebSocket clients are stopped when those 
clients no longer need them.
Note that while this happens automatically for WebSocket client calls made by 
web applications, stand-along clients must call the Tomcat specific method 
WsWebSocketContainer.destroy().

Added:
    tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java   
(with props)
Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java

Added: tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java?rev=1589100&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java 
(added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java 
Tue Apr 22 12:07:04 2014
@@ -0,0 +1,117 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import java.io.IOException;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
+
+/**
+ * This is a utility class that enables multiple {@link WsWebSocketContainer}
+ * instances to share a single {@link AsynchronousChannelGroup} while ensuring
+ * that the group is destroyed when no longer required.
+ */
+public class AsyncChannelGroupUtil {
+
+    private static final StringManager sm =
+            StringManager.getManager(Constants.PACKAGE_NAME);
+
+    private static AsynchronousChannelGroup group = null;
+    private static int usageCount = 0;
+    private static final Object lock = new Object();
+
+
+    private AsyncChannelGroupUtil() {
+        // Hide the default constructor
+    }
+
+
+    public static AsynchronousChannelGroup register() {
+        synchronized (lock) {
+            if (usageCount == 0) {
+                group = createAsynchronousChannelGroup();
+            }
+            usageCount++;
+            return group;
+        }
+    }
+
+
+    public static void unregister() {
+        synchronized (lock) {
+            usageCount--;
+            if (usageCount == 0) {
+                group.shutdown();
+                group = null;
+            }
+        }
+    }
+
+
+    private static AsynchronousChannelGroup createAsynchronousChannelGroup() {
+        // Need to do this with the right thread context class loader else the
+        // first web app to call this will trigger a leak
+        ClassLoader original = Thread.currentThread().getContextClassLoader();
+
+        try {
+            Thread.currentThread().setContextClassLoader(
+                    AsyncIOThreadFactory.class.getClassLoader());
+
+            // These are the same settings as the default
+            // AsynchronousChannelGroup
+            int initialSize = Runtime.getRuntime().availableProcessors();
+            ExecutorService executorService = new ThreadPoolExecutor(
+                    0,
+                    Integer.MAX_VALUE,
+                    Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                    new SynchronousQueue<Runnable>(),
+                    new AsyncIOThreadFactory());
+
+            try {
+                return AsynchronousChannelGroup.withCachedThreadPool(
+                        executorService, initialSize);
+            } catch (IOException e) {
+                // No good reason for this to happen.
+                throw new 
IllegalStateException(sm.getString("asyncChannelGroup.createFail"));
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(original);
+        }
+    }
+
+
+    private static class AsyncIOThreadFactory implements ThreadFactory {
+
+        private AtomicInteger count = new AtomicInteger(0);
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r);
+            t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet());
+            t.setContextClassLoader(this.getClass().getClassLoader());
+            t.setDaemon(true);
+            return t;
+        }
+    }
+}

Propchange: 
tomcat/trunk/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1589100&r1=1589099&r2=1589100&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Tue 
Apr 22 12:07:04 2014
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+asyncChannelGroup.createFail=Unable to create dedicated 
AsynchronousChannelGroup for WebSocket clients which is required to prevent 
memory leaks in complex class loader environments like J2EE containers
+
 asyncChannelWrapperSecure.closeFail=Failed to close channel cleanly
 asyncChannelWrapperSecure.concurrentRead=Concurrent read operations are not 
permitted
 asyncChannelWrapperSecure.concurrentWrite=Concurrent write operations are not 
permitted
@@ -86,7 +88,6 @@ wsSession.unknownHandlerType=Unable to a
 # as many as 4 bytes.
 wsWebSocketContainer.shutdown=The web application is stopping
 
-wsWebSocketContainer.asynchronousChannelGroupFail=Unable to create dedicated 
AsynchronousChannelGroup for WebSocket clients which is required to prevent 
memory leaks in complex class loader environments like J2EE containers
 wsWebSocketContainer.asynchronousSocketChannelFail=Unable to open a connection 
to the server
 wsWebSocketContainer.defaultConfiguratorFaill=Failed to create the default 
configurator
 wsWebSocketContainer.endpointCreateFail=Failed to create a local endpoint of 
type [{0}]

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1589100&r1=1589099&r2=1589100&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Tue 
Apr 22 12:07:04 2014
@@ -42,13 +42,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -69,7 +65,6 @@ import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.codec.binary.Base64;
 import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 import org.apache.tomcat.websocket.pojo.PojoEndpointClient;
 
 public class WsWebSocketContainer
@@ -109,43 +104,9 @@ public class WsWebSocketContainer
             StringManager.getManager(Constants.PACKAGE_NAME);
     private static final Random random = new Random();
     private static final byte[] crlf = new byte[] {13, 10};
-    private static final AsynchronousChannelGroup asynchronousChannelGroup;
 
-    static {
-        AsynchronousChannelGroup result = null;
-
-        // Need to do this with the right thread context class loader else the
-        // first web app to call this will trigger a leak
-        ClassLoader original = Thread.currentThread().getContextClassLoader();
-
-        try {
-            Thread.currentThread().setContextClassLoader(
-                    AsyncIOThreadFactory.class.getClassLoader());
-
-            // These are the same settings as the default
-            // AsynchronousChannelGroup
-            int initialSize = Runtime.getRuntime().availableProcessors();
-            ExecutorService executorService = new ThreadPoolExecutor(
-                    0,
-                    Integer.MAX_VALUE,
-                    Long.MAX_VALUE, TimeUnit.MILLISECONDS,
-                    new SynchronousQueue<Runnable>(),
-                    new AsyncIOThreadFactory());
-
-            try {
-                result = AsynchronousChannelGroup.withCachedThreadPool(
-                        executorService, initialSize);
-            } catch (IOException e) {
-                // No good reason for this to happen.
-                throw new IllegalStateException(sm.getString(
-                        "wsWebSocketContainer.asynchronousChannelGroupFail"));
-            }
-        } finally {
-            Thread.currentThread().setContextClassLoader(original);
-        }
-
-        asynchronousChannelGroup = result;
-    }
+    private AsynchronousChannelGroup asynchronousChannelGroup = null;
+    private final Object asynchronousChannelGroupLock = new Object();
 
     private final Log log = LogFactory.getLog(WsWebSocketContainer.class);
     private final Map<Class<?>, Set<WsSession>> endpointSessionMap =
@@ -285,8 +246,7 @@ public class WsWebSocketContainer
 
         AsynchronousSocketChannel socketChannel;
         try {
-            socketChannel =
-                    AsynchronousSocketChannel.open(asynchronousChannelGroup);
+            socketChannel = 
AsynchronousSocketChannel.open(getAsynchronousChannelGroup());
         } catch (IOException ioe) {
             throw new DeploymentException(sm.getString(
                     "wsWebSocketContainer.asynchronousSocketChannelFail"), 
ioe);
@@ -798,6 +758,33 @@ public class WsWebSocketContainer
                         "wsWebSocketContainer.sessionCloseFail", 
session.getId()), ioe);
             }
         }
+
+        // Only unregister with AsyncChannelGroupUtil if this instance
+        // registered with it
+        if (asynchronousChannelGroup != null) {
+            synchronized (asynchronousChannelGroupLock) {
+                if (asynchronousChannelGroup != null) {
+                    AsyncChannelGroupUtil.unregister();
+                    asynchronousChannelGroup = null;
+                }
+            }
+        }
+    }
+
+
+    private AsynchronousChannelGroup getAsynchronousChannelGroup() {
+        // Use AsyncChannelGroupUtil to share a common group amongst all
+        // WebSocket clients
+        AsynchronousChannelGroup result = asynchronousChannelGroup;
+        if (result == null) {
+            synchronized (asynchronousChannelGroupLock) {
+                if (asynchronousChannelGroup == null) {
+                    asynchronousChannelGroup = 
AsyncChannelGroupUtil.register();
+                }
+                result = asynchronousChannelGroup;
+            }
+        }
+        return result;
     }
 
 
@@ -835,23 +822,4 @@ public class WsWebSocketContainer
     public int getProcessPeriod() {
         return processPeriod;
     }
-
-
-    /**
-     * Create threads for AsyncIO that have the right context class loader to
-     * prevent memory leaks.
-     */
-    private static class AsyncIOThreadFactory implements ThreadFactory {
-
-        private AtomicInteger count = new AtomicInteger(0);
-
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(r);
-            t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet());
-            t.setContextClassLoader(this.getClass().getClassLoader());
-            t.setDaemon(true);
-            return t;
-        }
-    }
 }

Modified: 
tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1589100&r1=1589099&r2=1589100&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java 
(original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java 
Tue Apr 22 12:07:04 2014
@@ -110,6 +110,8 @@ public class TestWsWebSocketContainer ex
         Queue<String> messages = handler.getMessages();
         Assert.assertEquals(1, messages.size());
         Assert.assertEquals(MESSAGE_STRING_1, messages.peek());
+
+        ((WsWebSocketContainer) wsContainer).destroy();
     }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to