Author: markt Date: Thu Jan 24 22:24:37 2013 New Revision: 1438229 URL: http://svn.apache.org/viewvc?rev=1438229&view=rev Log: Implement configurable buffer sizes. Also addresses: - Pass copies rather than original byte buffers to application code - Implement user properties for sessions - Ensure sessions are only closed once - Make calling of onClose() consistent
Modified: tomcat/trunk/java/javax/websocket/WebSocketContainer.java tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java tomcat/trunk/java/org/apache/tomcat/websocket/pojo/PojoEndpoint.java tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServlet.java tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/WsConfigListener.java tomcat/trunk/webapps/examples/WEB-INF/web.xml Modified: tomcat/trunk/java/javax/websocket/WebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/WebSocketContainer.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/javax/websocket/WebSocketContainer.java (original) +++ tomcat/trunk/java/javax/websocket/WebSocketContainer.java Thu Jan 24 22:24:37 2013 @@ -53,12 +53,24 @@ public interface WebSocketContainer { void setMaxSessionIdleTimeout(long timeout); + /** + * Get the current maximum buffer size (in bytes) for binary messages. + */ long getMaxBinaryMessageBufferSize(); + /** + * Set the current maximum buffer size (in bytes) for binary messages. + */ void setMaxBinaryMessageBufferSize(long max); + /** + * Get the current maximum buffer size (in characters) for text messages. + */ long getMaxTextMessageBufferSize(); + /** + * Set the current maximum buffer size (in characters) for text messages. + */ void setMaxTextMessageBufferSize(long max); Set<Extension> getInstalledExtensions(); Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu Jan 24 22:24:37 2013 @@ -37,6 +37,7 @@ wsFrame.wrongRsv=The client frame set th wsRemoteEndpoint.concurrentMessageSend=Messages may not be send concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next. wsWebSocketContainer.invalidStatus=The HTTP response from the server [{0}] did not permit the HTTP upgrade to WebSocket +wsWebSocketContainer.maxBuffer=This implementation limits the maximum size of a buffer to Integer.MAX_VALUE wsWebSocketContainer.pathNoHost=No host was specified in URI wsWebSocketContainer.pathWrongScheme=The scheme [{0}] is not supported wsWebSocketContainer.endpointCreateFail=Failed to create a local endpoint of type [{0}] \ No newline at end of file Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java Thu Jan 24 22:24:37 2013 @@ -51,6 +51,7 @@ public abstract class WsFrameBase { private final CharBuffer controlBufferText = CharBuffer.allocate(125); // Attributes of the current message + // TODO: May need a new ByteBuffer per message private final ByteBuffer messageBufferBinary; private final CharBuffer messageBufferText; private final CharsetDecoder utf8DecoderControl = new Utf8Decoder(). @@ -76,20 +77,13 @@ public abstract class WsFrameBase { private int readPos = 0; protected int writePos = 0; - public WsFrameBase(WsSession wsSession) { - this.wsSession = wsSession; + public WsFrameBase(int binaryBufferSize, int textBufferSize, + WsSession wsSession) { - // TODO This needs to work for client and server side code - /* - int readBufferSize = - ServerContainerImpl.getServerContainer().getReadBufferSize(); - */ - // Temp hack until the above is resolved - int readBufferSize = 8192; - - inputBuffer = new byte[readBufferSize]; - messageBufferBinary = ByteBuffer.allocate(readBufferSize); - messageBufferText = CharBuffer.allocate(readBufferSize); + inputBuffer = new byte[binaryBufferSize]; + messageBufferBinary = ByteBuffer.allocate(binaryBufferSize); + messageBufferText = CharBuffer.allocate(textBufferSize); + this.wsSession = wsSession; } @@ -414,7 +408,11 @@ public abstract class WsFrameBase { } else { // Ran out of message buffer - flush it messageBufferBinary.flip(); - sendMessageBinary(false); + ByteBuffer copy = + ByteBuffer.allocate(messageBufferBinary.limit()); + copy.put(messageBufferBinary); + copy.flip(); + sendMessageBinary(copy, false); messageBufferBinary.clear(); } } @@ -426,7 +424,11 @@ public abstract class WsFrameBase { } else { // Message is complete - send it messageBufferBinary.flip(); - sendMessageBinary(true); + ByteBuffer copy = + ByteBuffer.allocate(messageBufferBinary.limit()); + copy.put(messageBufferBinary); + copy.flip(); + sendMessageBinary(copy, true); messageBufferBinary.clear(); newMessage(); } @@ -436,19 +438,18 @@ public abstract class WsFrameBase { @SuppressWarnings("unchecked") - private void sendMessageBinary(boolean last) { + private void sendMessageBinary(ByteBuffer msg, boolean last) { MessageHandler mh = wsSession.getBinaryMessageHandler(); if (mh != null) { if (mh instanceof MessageHandler.Async<?>) { - ((MessageHandler.Async<ByteBuffer>) mh).onMessage( - messageBufferBinary, last); + ((MessageHandler.Async<ByteBuffer>) mh).onMessage(msg, last); } else { - ((MessageHandler.Basic<ByteBuffer>) mh).onMessage( - messageBufferBinary); + ((MessageHandler.Basic<ByteBuffer>) mh).onMessage(msg); } } } + private void newMessage() { messageBufferBinary.clear(); messageBufferText.clear(); Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java Thu Jan 24 22:24:37 2013 @@ -31,8 +31,8 @@ public class WsFrameClient extends WsFra private final CompletionHandler<Integer,Void> handler; public WsFrameClient(ByteBuffer response, AsynchronousSocketChannel channel, - WsSession wsSession) { - super(wsSession); + int binaryBufferSize, int textBufferSize, WsSession wsSession) { + super(binaryBufferSize, textBufferSize, wsSession); this.response = response; this.channel = channel; this.handler = new WsFrameClientCompletionHandler(); Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Jan 24 22:24:37 2013 @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; @@ -50,6 +51,8 @@ public class WsSession implements Sessio private MessageHandler binaryMessageHandler = null; private MessageHandler.Basic<PongMessage> pongMessageHandler = null; private volatile boolean open = true; + private final Object closeLock = new Object(); + private Map<String,Object> userProperties = new ConcurrentHashMap<>(); /** @@ -217,27 +220,36 @@ public class WsSession implements Sessio @Override public void close(CloseReason closeReason) throws IOException { - open = false; + // Double-checked locking. OK because open is volatile + if (!open) { + return; + } + synchronized (closeLock) { + if (!open) { + return; + } + open = false; + + // Send the close message + // 125 is maximum size for the payload of a control message + ByteBuffer msg = ByteBuffer.allocate(125); + msg.putShort((short) closeReason.getCloseCode().getCode()); + String reason = closeReason.getReasonPhrase(); + if (reason != null && reason.length() > 0) { + msg.put(reason.getBytes(UTF8)); + } + msg.flip(); + wsRemoteEndpoint.sendMessageBlocking(Constants.OPCODE_CLOSE, msg, true); - // Send the close message - // 125 is maximum size for the payload of a control message - ByteBuffer msg = ByteBuffer.allocate(125); - msg.putShort((short) closeReason.getCloseCode().getCode()); - String reason = closeReason.getReasonPhrase(); - if (reason != null && reason.length() > 0) { - msg.put(reason.getBytes(UTF8)); - } - msg.flip(); - wsRemoteEndpoint.sendMessageBlocking(Constants.OPCODE_CLOSE, msg, true); - - // Fire the onClose event - Thread t = Thread.currentThread(); - ClassLoader cl = t.getContextClassLoader(); - t.setContextClassLoader(applicationClassLoader); - try { - localEndpoint.onClose(this, closeReason); - } finally { - t.setContextClassLoader(cl); + // Fire the onClose event + Thread t = Thread.currentThread(); + ClassLoader cl = t.getContextClassLoader(); + t.setContextClassLoader(applicationClassLoader); + try { + localEndpoint.onClose(this, closeReason); + } finally { + t.setContextClassLoader(cl); + } } } @@ -272,8 +284,7 @@ public class WsSession implements Sessio @Override public Map<String,Object> getUserProperties() { - // TODO Auto-generated method stub - return null; + return userProperties; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Thu Jan 24 22:24:37 2013 @@ -52,7 +52,10 @@ public class WsWebSocketContainer implem private static final Random random = new Random(); private static final Charset iso88591 = Charset.forName("ISO-8859-1"); private static final byte[] crlf = new byte[] {13, 10}; + private static final int defaultBufferSize = 8 * 1024; + private int binaryBufferSize = defaultBufferSize; + private int textBufferSize = defaultBufferSize; @Override public Session connectToServer(Class<?> annotatedEndpointClass, URI path) @@ -120,8 +123,8 @@ public class WsWebSocketContainer implem thisWrite = fWrite.get(); toWrite -= thisWrite.intValue(); } - // TODO Needs to be same size or smaller than WsFrame input buffer - response = ByteBuffer.allocate(4 * 1024); + // Same size as the WsFrame input buffer + response = ByteBuffer.allocate(binaryBufferSize); HandshakeResponse handshakeResponse = processResponse(response, channel); @@ -148,8 +151,8 @@ public class WsWebSocketContainer implem // Object creation will trigger input processing @SuppressWarnings("unused") - WsFrameClient wsFrameClient = - new WsFrameClient(response, channel, wsSession); + WsFrameClient wsFrameClient = new WsFrameClient(response, channel, + binaryBufferSize, textBufferSize, wsSession); return wsSession; } @@ -358,27 +361,33 @@ public class WsWebSocketContainer implem @Override public long getMaxBinaryMessageBufferSize() { - // TODO Auto-generated method stub - return 0; + return binaryBufferSize; } @Override public void setMaxBinaryMessageBufferSize(long max) { - // TODO Auto-generated method stub + if (max > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + sm.getString("wsWebSocketContainer.maxBuffer")); + } + binaryBufferSize = (int) max; } @Override public long getMaxTextMessageBufferSize() { - // TODO Auto-generated method stub - return 0; + return textBufferSize; } @Override public void setMaxTextMessageBufferSize(long max) { - // TODO Auto-generated method stub + if (max > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + sm.getString("wsWebSocketContainer.maxBuffer")); + } + textBufferSize = (int) max; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/pojo/PojoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/pojo/PojoEndpoint.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/pojo/PojoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/pojo/PojoEndpoint.java Thu Jan 24 22:24:37 2013 @@ -16,7 +16,6 @@ */ package org.apache.tomcat.websocket.pojo; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import javax.websocket.CloseReason; @@ -66,17 +65,7 @@ public class PojoEndpoint extends Endpoi @Override public void onClose(Session session, CloseReason closeReason) { - if (methodMapping.getOnClose() == null) { - // If the POJO doesn't handle the close, close the connection - try { - if (session.isOpen()) { - session.close(closeReason); - } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } else { + if (methodMapping.getOnClose() != null) { try { methodMapping.getOnClose().invoke(pojo, methodMapping.getOnCloseArgs(pathInfo, session)); Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/Constants.java Thu Jan 24 22:24:37 2013 @@ -25,6 +25,11 @@ public class Constants { Constants.class.getPackage().getName(); protected static final String SERVLET_NAME = WsServlet.class.getName(); + public static final String BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM = + "org.apache.tomcat.websocket.binaryBufferSize"; + public static final String TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM = + "org.apache.tomcat.websocket.textBufferSize"; + private Constants() { // Hide default constructor } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java Thu Jan 24 22:24:37 2013 @@ -37,7 +37,13 @@ import org.apache.tomcat.websocket.pojo. /** * Provides a per class loader (i.e. per web application) instance of a - * ServerContainer. + * ServerContainer. Web application wide defaults may be configured by setting + * the following sevrlet context initialisation parameters to the desired + * values. + * <ul> + * <li>{@link Constants#BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li> + * <li>{@link Constants#TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li> + * </ul> */ public class ServerContainerImpl extends WsWebSocketContainer { @@ -71,7 +77,6 @@ public class ServerContainerImpl extends private Map<String,Class<?>> pojoMap = new ConcurrentHashMap<>(); private Map<Class<?>,PojoMethodMapping> pojoMethodMap = new ConcurrentHashMap<>(); - private volatile int readBufferSize = 8192; private ServerContainerImpl() { @@ -81,6 +86,19 @@ public class ServerContainerImpl extends public void setServletContext(ServletContext servletContext) { this.servletContext = servletContext; + + // Configure servlet context wide defaults + String value = servletContext.getInitParameter( + Constants.BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM); + if (value != null) { + setMaxBinaryMessageBufferSize(Long.parseLong(value)); + } + + value = servletContext.getInitParameter( + Constants.TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM); + if (value != null) { + setMaxTextMessageBufferSize(Long.parseLong(value)); + } } @@ -194,18 +212,6 @@ public class ServerContainerImpl extends } - - public int getReadBufferSize() { - return readBufferSize; - } - - - - public void setReadBufferSize(int readBufferSize) { - this.readBufferSize = readBufferSize; - } - - /** * Converts a path defined for a WebSocket endpoint into a path that can be * used as a servlet mapping. Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java Thu Jan 24 22:24:37 2013 @@ -30,8 +30,9 @@ public class WsFrameServer extends WsFra private final Object connectionReadLock = new Object(); - public WsFrameServer(ServletInputStream sis, WsSession wsSession) { - super(wsSession); + public WsFrameServer(ServletInputStream sis, int binaryBufferSize, + int textBufferSize, WsSession wsSession) { + super(binaryBufferSize, textBufferSize, wsSession); this.sis = sis; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsProtocolHandler.java Thu Jan 24 22:24:37 2013 @@ -29,6 +29,7 @@ import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.Endpoint; import javax.websocket.EndpointConfiguration; +import javax.websocket.WebSocketContainer; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -49,15 +50,20 @@ public class WsProtocolHandler implement private final Endpoint ep; private final EndpointConfiguration endpointConfig; private final ClassLoader applicationClassLoader; + private final int binaryBufferSize; + private final int textBufferSize; private WsSession wsSession; public WsProtocolHandler(Endpoint ep, - EndpointConfiguration endpointConfig) { + EndpointConfiguration endpointConfig, + WebSocketContainer wsc) { this.ep = ep; this.endpointConfig = endpointConfig; applicationClassLoader = Thread.currentThread().getContextClassLoader(); + binaryBufferSize = (int) wsc.getMaxBinaryMessageBufferSize(); + textBufferSize = (int) wsc.getMaxTextMessageBufferSize(); } @@ -79,11 +85,12 @@ public class WsProtocolHandler implement ClassLoader cl = t.getContextClassLoader(); t.setContextClassLoader(applicationClassLoader); try { - WsFrameServer wsFrame = new WsFrameServer(sis, wsSession); - sis.setReadListener(new WsReadListener(this, wsFrame)); WsRemoteEndpointServer wsRemoteEndpointServer = new WsRemoteEndpointServer(sos); wsSession = new WsSession(ep, wsRemoteEndpointServer); + WsFrameServer wsFrame = new WsFrameServer( + sis, binaryBufferSize, textBufferSize, wsSession); + sis.setReadListener(new WsReadListener(this, wsFrame)); sos.setWriteListener( new WsWriteListener(this, wsRemoteEndpointServer)); ep.onOpen(wsSession, endpointConfig); @@ -107,13 +114,11 @@ public class WsProtocolHandler implement private void close(CloseReason cr) { - if (wsSession.isOpen()) { - try { - wsSession.close(cr); - } catch (IOException e) { - if (log.isInfoEnabled()) { - log.info(sm.getString("wsProtocolHandler.closeFailed"), e); - } + try { + wsSession.close(cr); + } catch (IOException e) { + if (log.isInfoEnabled()) { + log.info(sm.getString("wsProtocolHandler.closeFailed"), e); } } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServlet.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServlet.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServlet.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsServlet.java Thu Jan 24 22:24:37 2013 @@ -84,8 +84,8 @@ public class WsServlet extends HttpServl return; } // Need an Endpoint instance to progress this further - ServerContainerImpl cp = ServerContainerImpl.getServerContainer(); - ServerEndpointConfiguration sec = cp.getServerEndpointConfiguration( + ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); + ServerEndpointConfiguration sec = sc.getServerEndpointConfiguration( req.getServletPath(), req.getPathInfo()); // Origin check String origin = req.getHeader("Origin"); @@ -132,7 +132,7 @@ public class WsServlet extends HttpServl } catch (InstantiationException | IllegalAccessException e) { throw new ServletException(e); } - ProtocolHandler wsHandler = new WsProtocolHandler(ep, sec); + ProtocolHandler wsHandler = new WsProtocolHandler(ep, sec, sc); req.upgrade(wsHandler); } Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original) +++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Thu Jan 24 22:24:37 2013 @@ -18,6 +18,7 @@ package org.apache.tomcat.websocket; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; +import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.DefaultClientConfiguration; import javax.websocket.Endpoint; @@ -45,6 +47,17 @@ import org.apache.tomcat.websocket.serve public class TestWsWebSocketContainer extends TomcatBaseTest { private static final String MESSAGE_STRING_1 = "qwerty"; + private static final String MESSAGE_TEXT_4K; + private static final byte[] MESSAGE_BINARY_4K = new byte[4096]; + + static { + StringBuilder sb = new StringBuilder(4096); + for (int i = 0; i < 4096; i++) { + sb.append('*'); + } + MESSAGE_TEXT_4K = sb.toString(); + } + @Test public void testConnectToServerEndpoint() throws Exception { @@ -59,8 +72,9 @@ public class TestWsWebSocketContainer ex WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); Session wsSession = wsContainer.connectToServer(TesterEndpoint.class, new DefaultClientConfiguration(), new URI("http://localhost:" + - getPort() + EchoConfig.PATH)); - TesterMessageHandlerString handler = new TesterMessageHandlerString(1); + getPort() + EchoConfig.PATH_ASYNC)); + CountDownLatch latch = new CountDownLatch(1); + TesterMessageHandlerText handler = new TesterMessageHandlerText(latch); wsSession.addMessageHandler(handler); wsSession.getRemote().sendString(MESSAGE_STRING_1); @@ -73,6 +87,7 @@ public class TestWsWebSocketContainer ex Assert.assertEquals(MESSAGE_STRING_1, messages.get(0)); } + @Test(expected=javax.websocket.DeploymentException.class) public void testConnectToServerEndpointInvalidScheme() throws Exception { Tomcat tomcat = getTomcatInstance(); @@ -86,9 +101,10 @@ public class TestWsWebSocketContainer ex WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); wsContainer.connectToServer(TesterEndpoint.class, new DefaultClientConfiguration(), new URI("ftp://localhost:" + - getPort() + EchoConfig.PATH)); + getPort() + EchoConfig.PATH_ASYNC)); } + @Test(expected=javax.websocket.DeploymentException.class) public void testConnectToServerEndpointNoHost() throws Exception { Tomcat tomcat = getTomcatInstance(); @@ -102,57 +118,224 @@ public class TestWsWebSocketContainer ex WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); wsContainer.connectToServer(TesterEndpoint.class, new DefaultClientConfiguration(), - new URI("http://" + EchoConfig.PATH)); + new URI("http://" + EchoConfig.PATH_ASYNC)); } - private static class TesterMessageHandlerString - implements MessageHandler.Basic<String> { - private final CountDownLatch latch; + @Test + public void testSmallTextBufferClientTextMessage() throws Exception { + doBufferTest(true, false, true, false); + } + + + @Test + public void testSmallTextBufferClientBinaryMessage() throws Exception { + doBufferTest(true, false, false, true); + } + + + @Test + public void testSmallTextBufferServerTextMessage() throws Exception { + doBufferTest(true, true, true, false); + } + - private volatile List<String> messages = new ArrayList<>(); + @Test + public void testSmallTextBufferServerBinaryMessage() throws Exception { + doBufferTest(true, true, false, true); + } + + + @Test + public void testSmallBinaryBufferClientTextMessage() throws Exception { + doBufferTest(false, false, true, false); + } + + + @Test + public void testSmallBinaryBufferClientBinaryMessage() throws Exception { + doBufferTest(false, false, false, false); + } + + + @Test + public void testSmallBinaryBufferServerTextMessage() throws Exception { + doBufferTest(false, true, true, false); + } + + + @Test + public void testSmallBinaryBufferServerBinaryMessage() throws Exception { + doBufferTest(false, true, false, false); + } - public TesterMessageHandlerString(int latchCount) { - if (latchCount > 0) { - latch = new CountDownLatch(latchCount); + + private void doBufferTest(boolean isTextBuffer, boolean isServerBuffer, + boolean isTextMessage, boolean pass) throws Exception { + + Tomcat tomcat = getTomcatInstance(); + // Must have a real docBase - just use temp + Context ctx = + tomcat.addContext("", System.getProperty("java.io.tmpdir")); + ctx.addApplicationListener(EchoConfig.class.getName()); + + WebSocketContainer wsContainer = ContainerProvider.getClientContainer(); + + if (isServerBuffer) { + if (isTextBuffer) { + ctx.addParameter( + org.apache.tomcat.websocket.server.Constants. + TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, + "1024"); + } else { + ctx.addParameter( + org.apache.tomcat.websocket.server.Constants. + BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, + "1024"); + } + } else { + if (isTextBuffer) { + wsContainer.setMaxTextMessageBufferSize(1024); } else { - latch = null; + wsContainer.setMaxBinaryMessageBufferSize(1024); } } - public List<String> getMessages() { - return messages; + tomcat.start(); + + Session wsSession = wsContainer.connectToServer(TesterEndpoint.class, + new DefaultClientConfiguration(), new URI("http://localhost:" + + getPort() + EchoConfig.PATH_BASIC)); + TesterMessageHandler<?> handler; + CountDownLatch latch = new CountDownLatch(1); + wsSession.getUserProperties().put("latch", latch); + if (isTextMessage) { + handler = new TesterMessageHandlerText(latch); + } else { + handler = new TesterMessageHandlerBinary(latch); + } + + wsSession.addMessageHandler(handler); + if (isTextMessage) { + wsSession.getRemote().sendString(MESSAGE_TEXT_4K); + } else { + wsSession.getRemote().sendBytes(ByteBuffer.wrap(MESSAGE_BINARY_4K)); + } + + boolean latchResult = handler.getLatch().await(100, TimeUnit.SECONDS); + + Assert.assertTrue(latchResult); + + List<?> messages = handler.getMessages(); + if (pass) { + Assert.assertEquals(1, messages.size()); + if (isTextMessage) { + Assert.assertEquals(MESSAGE_TEXT_4K, messages.get(0)); + } else { + Assert.assertEquals(ByteBuffer.wrap(MESSAGE_BINARY_4K), + messages.get(0)); + } + } else { + Assert.assertFalse(wsSession.isOpen()); + } + } + + private abstract static class TesterMessageHandler<T> + implements MessageHandler.Basic<T> { + + private final CountDownLatch latch; + + private volatile List<T> messages = new ArrayList<>(); + + public TesterMessageHandler(CountDownLatch latch) { + this.latch = latch; } public CountDownLatch getLatch() { return latch; } + public List<T> getMessages() { + return messages; + } + } + + private static class TesterMessageHandlerText + extends TesterMessageHandler<String> { + + + public TesterMessageHandlerText(CountDownLatch latch) { + super(latch); + } + @Override public void onMessage(String message) { - if (latch != null) { - latch.countDown(); + getMessages().add(message); + if (getLatch() != null) { + getLatch().countDown(); + } + } + } + + + private static class TesterMessageHandlerBinary + extends TesterMessageHandler<ByteBuffer> { + + public TesterMessageHandlerBinary(CountDownLatch latch) { + super(latch); + } + + @Override + public void onMessage(ByteBuffer message) { + getMessages().add(message); + if (getLatch() != null) { + getLatch().countDown(); } - messages.add(message); } } + public static class TesterEndpoint extends Endpoint { - @Override + @Override + public void onClose(Session session, CloseReason closeReason) { + clearLatch(session); + } + + @Override + public void onError(Session session, Throwable throwable) { + clearLatch(session); + } + + private void clearLatch(Session session) { + CountDownLatch latch = + (CountDownLatch) session.getUserProperties().get("latch"); + if (latch != null) { + while (latch.getCount() > 0) { + latch.countDown(); + } + } + } + + @Override public void onOpen(Session session, EndpointConfiguration config) { // NO-OP } } + public static class EchoConfig implements ServletContextListener { - public static final String PATH = "/echo"; + public static final String PATH_ASYNC = "/echoAsync"; + public static final String PATH_BASIC = "/echoBasic"; @Override public void contextInitialized(ServletContextEvent sce) { ServerContainerImpl sc = ServerContainerImpl.getServerContainer(); - sc.publishServer(Echo.class, sce.getServletContext(), PATH); + sc.publishServer( + EchoAsync.class, sce.getServletContext(), PATH_ASYNC); + sc.publishServer( + EchoBasic.class, sce.getServletContext(), PATH_BASIC); } @Override @@ -161,7 +344,39 @@ public class TestWsWebSocketContainer ex } } - public static class Echo { + + public static class EchoBasic { + @WebSocketMessage + public void echoTextMessage(Session session, String msg) { + try { + session.getRemote().sendString(msg); + } catch (IOException e) { + try { + session.close(); + } catch (IOException e1) { + // Ignore + } + } + } + + + @WebSocketMessage + public void echoBinaryMessage(Session session, ByteBuffer msg) { + try { + session.getRemote().sendBytes(msg); + } catch (IOException e) { + try { + session.close(); + } catch (IOException e1) { + // Ignore + } + } + } + } + + + public static class EchoAsync { + @WebSocketMessage public void echoTextMessage(Session session, String msg, boolean last) { try { @@ -174,5 +389,20 @@ public class TestWsWebSocketContainer ex } } } + + + @WebSocketMessage + public void echoBinaryMessage(Session session, ByteBuffer msg, + boolean last) { + try { + session.getRemote().sendPartialBytes(msg, last); + } catch (IOException e) { + try { + session.close(); + } catch (IOException e1) { + // Ignore + } + } + } } } Modified: tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/WsConfigListener.java URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/WsConfigListener.java?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/WsConfigListener.java (original) +++ tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/echo/WsConfigListener.java Thu Jan 24 22:24:37 2013 @@ -39,12 +39,6 @@ public class WsConfigListener implements } catch (DeploymentException e) { throw new IllegalStateException(e); } - - String strReadBufferSize = - servletContext.getInitParameter("wsReadBufferSize"); - if (strReadBufferSize != null) { - sc.setReadBufferSize(Integer.valueOf(strReadBufferSize).intValue()); - } } Modified: tomcat/trunk/webapps/examples/WEB-INF/web.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/web.xml?rev=1438229&r1=1438228&r2=1438229&view=diff ============================================================================== --- tomcat/trunk/webapps/examples/WEB-INF/web.xml (original) +++ tomcat/trunk/webapps/examples/WEB-INF/web.xml Thu Jan 24 22:24:37 2013 @@ -353,13 +353,4 @@ <servlet-name>stock</servlet-name> <url-pattern>/async/stockticker</url-pattern> </servlet-mapping> - - <!-- Uncomment the section below to increase the WebSocket read buffer --> - <!-- size from the default of 8k to 512k --> - <!-- - <context-param> - <param-name>wsReadBufferSize</param-name> - <param-value>524288</param-value> - </context-param> - --> </web-app> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org