Author: markt Date: Wed Jan 30 19:51:47 2013 New Revision: 1440622 URL: http://svn.apache.org/viewvc?rev=1440622&view=rev Log: Implement client and server async timeout Note this doesn't work for BIO as it always blocks and only pretends to do async writes.
Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java (with props) tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java (with props) Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java tomcat/trunk/java/javax/websocket/WebSocketContainer.java tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original) +++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Wed Jan 30 19:51:47 2013 @@ -28,7 +28,20 @@ public interface RemoteEndpoint { void setBatchingAllowed(boolean batchingAllowed); boolean getBatchingAllowed(); void flushBatch(); + + /** + * Obtain the timeout (in milliseconds) for sending a message + * asynchronously. A non-positive value means an infinite timeout. The + * default value is determined by + * {@link WebSocketContainer#getDefaultAsyncSendTimeout()}. + */ long getAsyncSendTimeout(); + + /** + * Set the timeout (in milliseconds) for sending a message asynchronously. A + * non-positive value means an infinite timeout. The default value is + * determined by {@link WebSocketContainer#getDefaultAsyncSendTimeout()}. + */ void setAsyncSendTimeout(long timeout); /** Modified: tomcat/trunk/java/javax/websocket/WebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/WebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/javax/websocket/WebSocketContainer.java (original) +++ tomcat/trunk/java/javax/websocket/WebSocketContainer.java Wed Jan 30 19:51:47 2013 @@ -21,8 +21,16 @@ import java.util.Set; public interface WebSocketContainer { + /** + * Obtain the default timeout (in milliseconds) for sending a message + * asynchronously. A non-positive value means an infinite timeout. + */ long getDefaultAsyncSendTimeout(); + /** + * Set the default timeout (in milliseconds) for sending a message + * asynchronously. A non-positive value means an infinite timeout. + */ void setAsyncSendTimeout(long timeout); Session connectToServer(Class<?> annotatedEndpointClass, URI path) Modified: tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java (original) +++ tomcat/trunk/java/org/apache/catalina/loader/WebappClassLoader.java Wed Jan 30 19:51:47 2013 @@ -2233,18 +2233,19 @@ public class WebappClassLoader Object[] table = (Object[]) internalTableField.get(map); if (table != null) { for (int j =0; j < table.length; j++) { - if (table[j] != null) { + Object obj = table[j]; + if (obj != null) { boolean potentialLeak = false; // Check the key - Object key = ((Reference<?>) table[j]).get(); + Object key = ((Reference<?>) obj).get(); if (this.equals(key) || loadedByThisOrChild(key)) { potentialLeak = true; } // Check the value Field valueField = - table[j].getClass().getDeclaredField("value"); + obj.getClass().getDeclaredField("value"); valueField.setAccessible(true); - Object value = valueField.get(table[j]); + Object value = valueField.get(obj); if (this.equals(value) || loadedByThisOrChild(value)) { potentialLeak = true; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Wed Jan 30 19:51:47 2013 @@ -49,41 +49,42 @@ public abstract class WsRemoteEndpointBa private final AtomicBoolean toBytesInProgress = new AtomicBoolean(false); private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder(); private final MessageSendStateMachine state = new MessageSendStateMachine(); + + private volatile long asyncSendTimeout = -1; + // Max length for WebSocket frame header is 14 bytes protected final ByteBuffer header = ByteBuffer.allocate(14); protected ByteBuffer payload = null; @Override - public void setBatchingAllowed(boolean batchingAllowed) { - // TODO Auto-generated method stub - + public long getAsyncSendTimeout() { + return asyncSendTimeout; } @Override - public boolean getBatchingAllowed() { - // TODO Auto-generated method stub - return false; + public void setAsyncSendTimeout(long timeout) { + this.asyncSendTimeout = timeout; } @Override - public void flushBatch() { + public void setBatchingAllowed(boolean batchingAllowed) { // TODO Auto-generated method stub } @Override - public long getAsyncSendTimeout() { + public boolean getBatchingAllowed() { // TODO Auto-generated method stub - return 0; + return false; } @Override - public void setAsyncSendTimeout(long timeout) { + public void flushBatch() { // TODO Auto-generated method stub } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointClient.java Wed Jan 30 19:51:47 2013 @@ -38,8 +38,13 @@ public class WsRemoteEndpointClient exte @Override protected void sendMessage(WsCompletionHandler handler) { - channel.write(new ByteBuffer[] {header, payload}, 0, 2, Long.MAX_VALUE, - TimeUnit.DAYS, null, handler); + long timeout = getAsyncSendTimeout(); + if (timeout < 1) { + timeout = Long.MAX_VALUE; + + } + channel.write(new ByteBuffer[] {header, payload}, 0, 2, + getAsyncSendTimeout(), TimeUnit.MILLISECONDS, null, handler); } @Override Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Wed Jan 30 19:51:47 2013 @@ -76,6 +76,8 @@ public class WsSession implements Sessio this.wsRemoteEndpoint = wsRemoteEndpoint; this.webSocketContainer = webSocketContainer; applicationClassLoader = Thread.currentThread().getContextClassLoader(); + wsRemoteEndpoint.setAsyncSendTimeout( + webSocketContainer.getDefaultAsyncSendTimeout()); } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Wed Jan 30 19:51:47 2013 @@ -54,6 +54,7 @@ public class WsWebSocketContainer implem private static final byte[] crlf = new byte[] {13, 10}; private static final int defaultBufferSize = 8 * 1024; + private long defaultAsyncTimeout = -1; private int binaryBufferSize = defaultBufferSize; private int textBufferSize = defaultBufferSize; @@ -399,16 +400,25 @@ public class WsWebSocketContainer implem } + /** + * {@inheritDoc} + * + * The default value for this implementation is -1. + */ @Override public long getDefaultAsyncSendTimeout() { - // TODO Auto-generated method stub - return 0; + return defaultAsyncTimeout; } + /** + * {@inheritDoc} + * + * The default value for this implementation is -1. + */ @Override public void setAsyncSendTimeout(long timeout) { - // TODO Auto-generated method stub + this.defaultAsyncTimeout = timeout; } private static class WsHandshakeResponse implements HandshakeResponse { Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java Wed Jan 30 19:51:47 2013 @@ -70,6 +70,8 @@ public class ServerContainerImpl extends return result; } + private final WsTimeout wsTimeout; + private final Thread timeoutThread; private volatile ServletContext servletContext = null; private Map<String,ServerEndpointConfiguration> configMap = @@ -80,11 +82,18 @@ public class ServerContainerImpl extends private ServerContainerImpl() { - // Hide default constructor + wsTimeout = new WsTimeout(); + timeoutThread = new Thread(wsTimeout); + timeoutThread.setName(WsTimeout.THREAD_NAME_PREFIX + this); + timeoutThread.start(); } public void setServletContext(ServletContext servletContext) { + if (this.servletContext == servletContext) { + return; + } + this.servletContext = servletContext; // Configure servlet context wide defaults @@ -99,6 +108,10 @@ public class ServerContainerImpl extends if (value != null) { setMaxTextMessageBufferSize(Long.parseLong(value)); } + + // Update the timeout thread name + timeoutThread.setName( + WsTimeout.THREAD_NAME_PREFIX + servletContext.getContextPath()); } @@ -212,6 +225,25 @@ public class ServerContainerImpl extends } + protected WsTimeout getTimeout() { + return wsTimeout; + } + + + protected void stop() { + wsTimeout.stop(); + int count = 0; + while (count < 50 && timeoutThread.isAlive()) { + count ++; + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // Ignore + } + } + } + + /** * Converts a path defined for a WebSocket endpoint into a path that can be * used as a servlet mapping. Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java?rev=1440622&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java (added) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java Wed Jan 30 19:51:47 2013 @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket.server; + +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +/** + * This will be added automatically to a {@link javax.servlet.ServletContext} by + * the {@link WsSci}. If the {@link WsSci} is disabled, this listener must be + * added manually to every {@link javax.servlet.ServletContext} that uses + * WebSocket. + */ +public class WsListener implements ServletContextListener { + + @Override + public void contextInitialized(ServletContextEvent sce) { + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); + sc.setServletContext(sce.getServletContext()); + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); + sc.stop(); + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java Wed Jan 30 19:51:47 2013 @@ -29,7 +29,6 @@ import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.Endpoint; import javax.websocket.EndpointConfiguration; -import javax.websocket.WebSocketContainer; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -50,14 +49,14 @@ public class WsProtocolHandler implement private final Endpoint ep; private final EndpointConfiguration endpointConfig; private final ClassLoader applicationClassLoader; - private final WebSocketContainer webSocketContainer; + private final ServerContainerImpl webSocketContainer; private WsSession wsSession; public WsProtocolHandler(Endpoint ep, EndpointConfiguration endpointConfig, - WebSocketContainer wsc) { + ServerContainerImpl wsc) { this.ep = ep; this.endpointConfig = endpointConfig; this.webSocketContainer = wsc; @@ -84,7 +83,7 @@ public class WsProtocolHandler implement t.setContextClassLoader(applicationClassLoader); try { WsRemoteEndpointServer wsRemoteEndpointServer = - new WsRemoteEndpointServer(sos); + new WsRemoteEndpointServer(sos, webSocketContainer); wsSession = new WsSession( ep, wsRemoteEndpointServer, webSocketContainer); WsFrameServer wsFrame = new WsFrameServer( Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java Wed Jan 30 19:51:47 2013 @@ -17,6 +17,7 @@ package org.apache.tomcat.websocket.server; import java.io.IOException; +import java.net.SocketTimeoutException; import javax.servlet.ServletOutputStream; @@ -38,15 +39,19 @@ public class WsRemoteEndpointServer exte LogFactory.getLog(WsProtocolHandler.class); private final ServletOutputStream sos; + private final WsTimeout wsTimeout; private volatile WsCompletionHandler handler = null; + private volatile long timeoutExpiry = -1; private volatile boolean close; private volatile Long size = null; private volatile boolean headerWritten = false; private volatile boolean payloadWritten = false; - public WsRemoteEndpointServer(ServletOutputStream sos) { + public WsRemoteEndpointServer(ServletOutputStream sos, + ServerContainerImpl serverContainer) { this.sos = sos; + this.wsTimeout = serverContainer.getTimeout(); } @@ -79,19 +84,29 @@ public class WsRemoteEndpointServer exte sos.write(payload.array(), payload.arrayOffset(), payload.limit()); } else { + wsTimeout.unregister(this); if (close) { - sos.close(); + close(); } handler.completed(size, null); - size = null; - handler = null; - headerWritten = false; - payloadWritten = false; + nextWrite(); break; } } } catch (IOException ioe) { + wsTimeout.unregister(this); + close(); handler.failed(ioe, null); + nextWrite(); + } + if (handler != null) { + // Async write is in progress + + timeoutExpiry = getAsyncSendTimeout() + System.currentTimeMillis(); + if (timeoutExpiry > 0) { + // Register with timeout thread + wsTimeout.register(this); + } } } @@ -105,5 +120,26 @@ public class WsRemoteEndpointServer exte log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e); } } + wsTimeout.unregister(this); + } + + + protected long getTimeoutExpiry() { + return timeoutExpiry; + } + + + protected void onTimeout() { + close(); + handler.failed(new SocketTimeoutException(), null); + nextWrite(); + } + + + private void nextWrite() { + handler = null; + size = null; + headerWritten = false; + payloadWritten = false; } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java Wed Jan 30 19:51:47 2013 @@ -35,12 +35,14 @@ public class WsSci implements ServletCon @Override public void onStartup(Set<Class<?>> clazzes, ServletContext ctx) throws ServletException { - // Need to configure the ServletContext in all cases - ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); - sc.setServletContext(ctx); + + ctx.addListener(WsListener.class); + if (clazzes == null || clazzes.size() == 0) { return; } + + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); for (Class<?> clazz : clazzes) { WebSocketEndpoint annotation = clazz.getAnnotation(WebSocketEndpoint.class); Added: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java?rev=1440622&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java (added) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java Wed Jan 30 19:51:47 2013 @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket.server; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * Provides timeouts for asynchronous web socket writes. On the server side we + * only have access to {@link javax.servlet.ServletOutputStream} and + * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout + * for writes to the client. Hence the separate thread. + */ +public class WsTimeout implements Runnable { + + public static final String THREAD_NAME_PREFIX = "Websocket Timeout - "; + + private final Set<WsRemoteEndpointServer> endpoints = + new ConcurrentSkipListSet<>(new EndpointComparator()); + private volatile boolean running = true; + + public void stop() { + running = false; + synchronized (this) { + this.notify(); + } + } + + + @Override + public void run() { + while (running) { + // Wait for one second - no need for timeouts more frequently than + // that + synchronized (this) { + try { + wait(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + long now = System.currentTimeMillis(); + Iterator<WsRemoteEndpointServer> iter = endpoints.iterator(); + while (iter.hasNext()) { + WsRemoteEndpointServer endpoint = iter.next(); + if (endpoint.getTimeoutExpiry() < now) { + System.out.println(now); + endpoint.onTimeout(); + } else { + // Endpoints are ordered by timeout expiry so we reach this + // point there is no need to check the remaining endpoints + break; + } + } + } + } + + + public void register(WsRemoteEndpointServer endpoint) { + endpoints.add(endpoint); + } + + + public void unregister(WsRemoteEndpointServer endpoint) { + endpoints.remove(endpoint); + } + + + /** + * Note: this comparator imposes orderings that are inconsistent with equals + */ + private static class EndpointComparator implements + Comparator<WsRemoteEndpointServer> { + + @Override + public int compare(WsRemoteEndpointServer o1, + WsRemoteEndpointServer o2) { + + long t1 = o1.getTimeoutExpiry(); + long t2 = o2.getTimeoutExpiry(); + + if (t1 < t2) { + return -1; + } else if (t1 == t2) { + return 0; + } else { + return 1; + } + } + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1440622&r1=1440621&r2=1440622&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original) +++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Wed Jan 30 19:51:47 2013 @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.servlet.ServletContextEvent; @@ -29,12 +31,15 @@ import javax.servlet.ServletContextListe import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.DefaultClientConfiguration; +import javax.websocket.DeploymentException; import javax.websocket.Endpoint; import javax.websocket.EndpointConfiguration; import javax.websocket.MessageHandler; +import javax.websocket.SendResult; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketMessage; +import javax.websocket.server.DefaultServerConfiguration; import org.junit.Assert; import org.junit.Test; @@ -42,7 +47,9 @@ import org.junit.Test; import org.apache.catalina.Context; import org.apache.catalina.startup.Tomcat; import org.apache.catalina.startup.TomcatBaseTest; +import org.apache.coyote.http11.Http11Protocol; import org.apache.tomcat.websocket.server.ServerContainerImpl; +import org.apache.tomcat.websocket.server.WsListener; public class TestWsWebSocketContainer extends TomcatBaseTest { @@ -50,6 +57,8 @@ public class TestWsWebSocketContainer ex private static final String MESSAGE_TEXT_4K; private static final byte[] MESSAGE_BINARY_4K = new byte[4096]; + private static final long TIMEOUT_MS = 5 * 1000; + static { StringBuilder sb = new StringBuilder(4096); for (int i = 0; i < 4096; i++) { @@ -246,6 +255,161 @@ public class TestWsWebSocketContainer ex } } + + @Test + public void testTimeoutClientContainer() throws Exception { + doTestTimeoutClient(true); + } + + + @Test + public void testTimeoutClientEndpoint() throws Exception { + doTestTimeoutClient(false); + } + + + private void doTestTimeoutClient(boolean setTimeoutOnContainer) + throws Exception { + + Tomcat tomcat = getTomcatInstance(); + // Must have a real docBase - just use temp + Context ctx = + tomcat.addContext("", System.getProperty("java.io.tmpdir")); + ctx.addApplicationListener(BlockingConfig.class.getName()); + + WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); + + // Reset client buffer size as client container is retained between + // tests + wsContainer.setMaxBinaryMessageBufferSize(8192); + wsContainer.setMaxTextMessageBufferSize(8192); + + + // Set the async timeout + if (setTimeoutOnContainer) { + wsContainer.setAsyncSendTimeout(TIMEOUT_MS); + } + + tomcat.start(); + + Session wsSession = wsContainer.connectToServer(TesterEndpoint.class, + new DefaultClientConfiguration(), new URI("http://localhost:"; + + getPort() + BlockingConfig.PATH)); + + if (!setTimeoutOnContainer) { + wsSession.getRemote().setAsyncSendTimeout(TIMEOUT_MS); + } + + long lastSend = 0; + boolean isOK = true; + SendResult sr = null; + + // Should send quickly until the network buffers fill up and then block + // until the timeout kicks in + while (isOK) { + Future<SendResult> f = wsSession.getRemote().sendBytesByFuture( + ByteBuffer.wrap(MESSAGE_BINARY_4K)); + lastSend = System.currentTimeMillis(); + sr = f.get(); + isOK = sr.isOK(); + } + + long timeout = System.currentTimeMillis() - lastSend; + + // Check correct time passed + Assert.assertTrue(timeout >= TIMEOUT_MS); + + // Check the timeout wasn't too long + Assert.assertTrue(timeout < TIMEOUT_MS*2); + + if (sr == null) { + Assert.fail(); + } else { + Assert.assertNotNull(sr.getException()); + } + } + + + @Test + public void testTimeoutServerContainer() throws Exception { + doTestTimeoutServer(true); + } + + + @Test + public void testTimeoutServerEndpoint() throws Exception { + doTestTimeoutServer(false); + } + + + private static volatile boolean timoutOnContainer = false; + + private void doTestTimeoutServer(boolean setTimeoutOnContainer) + throws Exception { + + /* + * Note: There are all sorts of horrible uses of statics in this test + * because the API uses classes and the tests really need access + * to the instances which simply isn't possible. + */ + timoutOnContainer = setTimeoutOnContainer; + + Tomcat tomcat = getTomcatInstance(); + + if (getProtocol().equals(Http11Protocol.class.getName())) { + // This will never work for BIO + return; + } + + // Must have a real docBase - just use temp + Context ctx = + tomcat.addContext("", System.getProperty("java.io.tmpdir")); + ctx.addApplicationListener(WsListener.class.getName()); + ctx.addApplicationListener(ConstantTxConfig.class.getName()); + + WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); + + // Reset client buffer size as client container is retained between + // tests + wsContainer.setMaxBinaryMessageBufferSize(8192); + wsContainer.setMaxTextMessageBufferSize(8192); + + tomcat.start(); + + Session wsSession = wsContainer.connectToServer(TesterEndpoint.class, + new DefaultClientConfiguration(), new URI("http://localhost:"; + + getPort() + ConstantTxConfig.PATH)); + + wsSession.addMessageHandler(new BlockingBinaryHandler()); + + int loops = 0; + while (loops < 60) { + Thread.sleep(1000); + if (!ConstantTxEndpoint.getRunning()) { + break; + } + } + + // Check nothing really bad happened + Assert.assertNull(ConstantTxEndpoint.getException()); + + System.out.println(ConstantTxEndpoint.getTimeout()); + // Check correct time passed + Assert.assertTrue(ConstantTxEndpoint.getTimeout() >= TIMEOUT_MS); + + // Check the timeout wasn't too long + Assert.assertTrue(ConstantTxEndpoint.getTimeout() < TIMEOUT_MS*2); + + if (ConstantTxEndpoint.getSendResult() == null) { + Assert.fail(); + } else { + Assert.assertNotNull( + ConstantTxEndpoint.getSendResult().getException()); + } + + } + + private abstract static class TesterMessageHandler<T> implements MessageHandler.Basic<T> { @@ -411,4 +575,151 @@ public class TestWsWebSocketContainer ex } } } + + + public static class BlockingConfig implements ServletContextListener { + + public static final String PATH = "/block"; + + @Override + public void contextInitialized(ServletContextEvent sce) { + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); + sc.publishServer(BlockingPojo.class, sce.getServletContext(), PATH); + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + // NO-OP + } + } + + + public static class BlockingPojo { + @SuppressWarnings("unused") + @WebSocketMessage + public void echoTextMessage(Session session, String msg, boolean last) { + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + // Ignore + } + } + + + @SuppressWarnings("unused") + @WebSocketMessage + public void echoBinaryMessage(Session session, ByteBuffer msg, + boolean last) { + try { + Thread.sleep(TIMEOUT_MS * 10); + } catch (InterruptedException e) { + // Ignore + } + } + } + + + public static class BlockingBinaryHandler + implements MessageHandler.Async<ByteBuffer> { + + @Override + public void onMessage(ByteBuffer messagePart, boolean last) { + try { + Thread.sleep(TIMEOUT_MS * 10); + } catch (InterruptedException e) { + // Ignore + } + } + } + + + public static class ConstantTxEndpoint extends Endpoint { + + // Have to be static to be able to retrieve results from test case + private static volatile long timeout = -1; + private static volatile boolean ok = true; + private static volatile SendResult sr = null; + private static volatile Exception exception = null; + private static volatile boolean running = true; + + + @Override + public void onOpen(Session session, EndpointConfiguration config) { + + // Reset everything + timeout = -1; + ok = true; + sr = null; + exception = null; + running = true; + + if (!TestWsWebSocketContainer.timoutOnContainer) { + session.getRemote().setAsyncSendTimeout(TIMEOUT_MS); + } + + long lastSend = 0; + + // Should send quickly until the network buffers fill up and then + // block until the timeout kicks in + try { + while (ok) { + lastSend = System.currentTimeMillis(); + Future<SendResult> f = session.getRemote().sendBytesByFuture( + ByteBuffer.wrap(MESSAGE_BINARY_4K)); + sr = f.get(); + ok = sr.isOK(); + } + } catch (ExecutionException | InterruptedException e) { + exception = e; + } + timeout = System.currentTimeMillis() - lastSend; + running = false; + } + + public static long getTimeout() { + return timeout; + } + + public static boolean isOK() { + return ok; + } + + public static SendResult getSendResult() { + return sr; + } + + public static Exception getException() { + return exception; + } + + public static boolean getRunning() { + return running; + } + } + + + public static class ConstantTxConfig implements ServletContextListener { + + private static final String PATH = "/test"; + + @Override + public void contextInitialized(ServletContextEvent sce) { + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); + sc.setServletContext(sce.getServletContext()); + try { + sc.publishServer(ConstantTxEndpoint.class, PATH, + DefaultServerConfiguration.class); + if (TestWsWebSocketContainer.timoutOnContainer) { + sc.setAsyncSendTimeout(TIMEOUT_MS); + } + } catch (DeploymentException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + // NO-OP + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org