Author: markt Date: Tue Jan 21 19:54:32 2014 New Revision: 1560159 URL: http://svn.apache.org/r1560159 Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=56032 After an IO error sending a WebSocket message, close the connection.
Modified: tomcat/tc7.0.x/trunk/ (props changed) tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterMessageCountClient.java tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Propchange: tomcat/tc7.0.x/trunk/ ------------------------------------------------------------------------------ Merged /tomcat/trunk:r1560158 Modified: tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java Tue Jan 21 19:54:32 2014 @@ -134,6 +134,6 @@ public class NioServletOutputStream exte @Override protected void doClose() throws IOException { - channel.close(); + channel.close(true); } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Tue Jan 21 19:54:32 2014 @@ -43,6 +43,7 @@ wsFrame.controlNoFin=A control frame was wsFrame.invalidOpCode= A WebSocket frame was sent with an unrecognised opCode of [{0}] wsFrame.invalidUtf8=A WebSocket text frame was received that could not be decoded to UTF-8 because it contained invalid byte sequences wsFrame.invalidUtf8Close=A WebSocket close frame was received with a close reason that contained invalid UTF-8 byte sequences +wsFrame.ioeTriggeredClose=An unrecoverable IOException occurred so the connection was closed wsFrame.messageTooBig=The message was [{0}] bytes long but the MessageHandler has a limit of [{1}] bytes wsFrame.noContinuation=A new message was started when a continuation frame was expected wsFrame.notMasked=The client frame was not masked but all client frames must be masked Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java Tue Jan 21 19:54:32 2014 @@ -333,8 +333,7 @@ public abstract class WsFrameBase { try { mhPong.onMessage(new WsPongMessage(controlBufferBinary)); } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - wsSession.getLocal().onError(wsSession, t); + handleThrowableOnSend(t); } finally { controlBufferBinary.clear(); } @@ -377,8 +376,7 @@ public abstract class WsFrameBase { messageBufferText.toString()); } } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - wsSession.getLocal().onError(wsSession, t); + handleThrowableOnSend(t); } finally { messageBufferText.clear(); } @@ -533,6 +531,15 @@ public abstract class WsFrameBase { } + private void handleThrowableOnSend(Throwable t) throws WsIOException { + ExceptionUtils.handleThrowable(t); + wsSession.getLocal().onError(wsSession, t); + CloseReason cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, + sm.getString("wsFrame.ioeTriggeredClose")); + throw new WsIOException(cr); + } + + @SuppressWarnings("unchecked") private void sendMessageBinary(ByteBuffer msg, boolean last) throws WsIOException { @@ -554,8 +561,7 @@ public abstract class WsFrameBase { ((MessageHandler.Whole<ByteBuffer>) binaryMsgHandler).onMessage(msg); } } catch(Throwable t) { - ExceptionUtils.handleThrowable(t); - wsSession.getLocal().onError(wsSession, t); + handleThrowableOnSend(t); } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsSession.java Tue Jan 21 19:54:32 2014 @@ -466,7 +466,13 @@ public class WsSession implements Sessio // 125 is maximum size for the payload of a control message ByteBuffer msg = ByteBuffer.allocate(125); CloseCode closeCode = closeReason.getCloseCode(); - msg.putShort((short) closeCode.getCode()); + // CLOSED_ABNORMALLY should not be put on the wire + if (closeCode == CloseCodes.CLOSED_ABNORMALLY) { + // PROTOCOL_ERROR is probably better than GOING_AWAY here + msg.putShort((short) CloseCodes.PROTOCOL_ERROR.getCode()); + } else { + msg.putShort((short) closeCode.getCode()); + } String reason = closeReason.getReasonPhrase(); if (reason != null && reason.length() > 0) { Modified: tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java (original) +++ tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java Tue Jan 21 19:54:32 2014 @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpointConfig; import javax.websocket.ContainerProvider; +import javax.websocket.MessageHandler; import javax.websocket.Session; import javax.websocket.WebSocketContainer; @@ -38,6 +39,7 @@ import org.apache.catalina.startup.Tomca import org.apache.catalina.startup.TomcatBaseTest; import org.apache.tomcat.util.net.TesterSupport; import org.apache.tomcat.websocket.TesterMessageCountClient.BasicText; +import org.apache.tomcat.websocket.TesterMessageCountClient.SleepingText; import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint; public class TestWebSocketFrameClient extends TomcatBaseTest { @@ -92,4 +94,69 @@ public class TestWebSocketFrameClient ex Assert.assertEquals(TesterFirehoseServer.MESSAGE, message); } } + + + @Test + public void testBug56032() throws Exception { + + Tomcat tomcat = getTomcatInstance(); + // Must have a real docBase - just use temp + Context ctx = + tomcat.addContext("", System.getProperty("java.io.tmpdir")); + ctx.addApplicationListener(new ApplicationListener( + TesterFirehoseServer.Config.class.getName(), false)); + Tomcat.addServlet(ctx, "default", new DefaultServlet()); + ctx.addServletMapping("/", "default"); + + TesterSupport.initSsl(tomcat); + + tomcat.start(); + + WebSocketContainer wsContainer = + ContainerProvider.getWebSocketContainer(); + ClientEndpointConfig clientEndpointConfig = + ClientEndpointConfig.Builder.create().build(); + clientEndpointConfig.getUserProperties().put( + WsWebSocketContainer.SSL_TRUSTSTORE_PROPERTY, + "test/org/apache/tomcat/util/net/ca.jks"); + Session wsSession = wsContainer.connectToServer( + TesterProgrammaticEndpoint.class, + clientEndpointConfig, + new URI("wss://localhost:" + getPort() + + TesterFirehoseServer.Config.PATH)); + + // Process incoming messages very slowly + MessageHandler handler = new SleepingText(5000); + wsSession.addMessageHandler(handler); + wsSession.getBasicRemote().sendText("Hello"); + + // Wait long enough for the buffers to fill and the send to timeout + int count = 0; + int limit = TesterFirehoseServer.WAIT_TIME_MILLIS / 100; + + System.out.println("Waiting for server to report an error"); + while (TesterFirehoseServer.Endpoint.getErrorCount() == 0 && count < limit) { + Thread.sleep(100); + count ++; + } + + if (TesterFirehoseServer.Endpoint.getErrorCount() == 0) { + Assert.fail("No error reported by Endpoint when timeout was expected"); + } + + // Wait up to another 10 seconds for the connection to be closed - + // should be a lot faster. + System.out.println("Waiting for connection to be closed"); + count = 0; + limit = 10000; + while (TesterFirehoseServer.Endpoint.getOpenConnectionCount() != 0 && count < limit) { + Thread.sleep(100); + count ++; + } + + int openConnectionCount = TesterFirehoseServer.Endpoint.getOpenConnectionCount(); + if (openConnectionCount != 0) { + Assert.fail("There are [" + openConnectionCount + "] connections still open"); + } + } } Modified: tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java (original) +++ tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Tue Jan 21 19:54:32 2014 @@ -17,10 +17,14 @@ package org.apache.tomcat.websocket; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ServletContextEvent; import javax.websocket.DeploymentException; +import javax.websocket.OnClose; +import javax.websocket.OnError; import javax.websocket.OnMessage; +import javax.websocket.OnOpen; import javax.websocket.RemoteEndpoint.Basic; import javax.websocket.Session; import javax.websocket.server.ServerContainer; @@ -39,6 +43,7 @@ public class TesterFirehoseServer { public static final String MESSAGE; public static final int MESSAGE_SIZE = 1024; public static final int WAIT_TIME_MILLIS = 60000; + public static final int SEND_TIME_OUT_MILLIS = 5000; static { StringBuilder sb = new StringBuilder(MESSAGE_SIZE); @@ -71,8 +76,24 @@ public class TesterFirehoseServer { @ServerEndpoint(Config.PATH) public static class Endpoint { + private static AtomicInteger openConnectionCount = new AtomicInteger(0); + private static AtomicInteger errorCount = new AtomicInteger(0); + private volatile boolean started = false; + public static int getOpenConnectionCount() { + return openConnectionCount.intValue(); + } + + public static int getErrorCount() { + return errorCount.intValue(); + } + + @OnOpen + public void onOpen() { + openConnectionCount.incrementAndGet(); + } + @OnMessage public void onMessage(Session session, @SuppressWarnings("unused") String msg) throws IOException { @@ -90,7 +111,7 @@ public class TesterFirehoseServer { session.getUserProperties().put( "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", - Long.valueOf(WAIT_TIME_MILLIS)); + Long.valueOf(SEND_TIME_OUT_MILLIS)); Basic remote = session.getBasicRemote(); remote.setBatchingAllowed(true); @@ -102,5 +123,15 @@ public class TesterFirehoseServer { // Ensure remaining messages are flushed remote.setBatchingAllowed(false); } + + @OnError + public void onError(@SuppressWarnings("unused") Throwable t) { + errorCount.incrementAndGet(); + } + + @OnClose + public void onClose() { + openConnectionCount.decrementAndGet(); + } } } Modified: tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterMessageCountClient.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterMessageCountClient.java?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterMessageCountClient.java (original) +++ tomcat/tc7.0.x/trunk/test/org/apache/tomcat/websocket/TesterMessageCountClient.java Tue Jan 21 19:54:32 2014 @@ -173,6 +173,24 @@ public class TesterMessageCountClient { } } + public static class SleepingText implements MessageHandler.Whole<String> { + + private final int sleep; + + public SleepingText(int sleep) { + this.sleep = sleep; + } + + @Override + public void onMessage(String message) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + // Ignore + } + } + } + public abstract static class AsyncHandler<T> implements MessageHandler.Partial<T> { Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1560159&r1=1560158&r2=1560159&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Tue Jan 21 19:54:32 2014 @@ -108,6 +108,10 @@ <code>ServerEndpointConfig.Configurator.modifyHandshake()</code>. (markt) </fix> + <fix> + <bug>56032</bug>: Ensure that the WebSocket connection is closed after + an IO error or an interrupt while sending a WebSocket message. (markt) + </fix> </changelog> </subsection> <subsection name="Coyote"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org