Author: markt Date: Wed Nov 20 13:51:34 2013 New Revision: 1543815 URL: http://svn.apache.org/r1543815 Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55799 Implement the restriction required by the JSR356 specification that only one message can be written to a remote endpoint at a time.
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1543815&r1=1543814&r2=1543815&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Wed Nov 20 13:51:34 2013 @@ -60,6 +60,7 @@ wsRemoteEndpoint.flushOnCloseFailed=Flus wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket session is currently sending another message wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}] +wsRemoteEndpoint.wrongState=Remote endpoint was in state [{0}] but state [{1}] is required for this action # Note the following message is used as a close reason in a WebSocket control # frame and therefore must be 123 bytes (not characters) or less in length. Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1543815&r1=1543814&r2=1543815&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Wed Nov 20 13:51:34 2013 @@ -59,6 +59,8 @@ public abstract class WsRemoteEndpointIm private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class); + private final StateMachine stateMachine = new StateMachine(); + private boolean messagePartInProgress = false; private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); private final Object messagePartLock = new Object(); @@ -114,7 +116,12 @@ public abstract class WsRemoteEndpointIm public void sendBytes(ByteBuffer data) throws IOException { - startMessageBlock(Constants.OPCODE_BINARY, data, true); + stateMachine.binaryStart(); + try { + startMessageBlock(Constants.OPCODE_BINARY, data, true); + } finally { + stateMachine.complete(true); + } } @@ -126,13 +133,20 @@ public abstract class WsRemoteEndpointIm public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) { - startMessage(Constants.OPCODE_BINARY, data, true, handler); + StateUpdateSendHandler sush = new StateUpdateSendHandler(handler); + stateMachine.binaryStart(); + startMessage(Constants.OPCODE_BINARY, data, true, sush); } public void sendPartialBytes(ByteBuffer partialByte, boolean last) throws IOException { - startMessageBlock(Constants.OPCODE_BINARY, partialByte, last); + stateMachine.binaryPartialStart(); + try { + startMessageBlock(Constants.OPCODE_BINARY, partialByte, last); + } finally { + stateMachine.complete(last); + } } @@ -151,6 +165,7 @@ public abstract class WsRemoteEndpointIm public void sendString(String text) throws IOException { + stateMachine.textStart(); sendPartialString(CharBuffer.wrap(text), true); } @@ -163,24 +178,29 @@ public abstract class WsRemoteEndpointIm public void sendStringByCompletion(String text, SendHandler handler) { + stateMachine.textStart(); TextMessageSendHandler tmsh = new TextMessageSendHandler(handler, CharBuffer.wrap(text), true, encoder, encoderBuffer, this); tmsh.write(); + // TextMessageSendHandler will update stateMachine when it completes } public void sendPartialString(String fragment, boolean isLast) throws IOException { + stateMachine.textPartialStart(); sendPartialString(CharBuffer.wrap(fragment), isLast); } public OutputStream getSendStream() { + stateMachine.streamStart(); return new WsOutputStream(this); } public Writer getSendWriter() { + stateMachine.writeStart(); return new WsWriter(this); } @@ -634,6 +654,7 @@ public abstract class WsRemoteEndpointIm @Override public void onResult(SendResult result) { if (isDone || !result.isOK()) { + endpoint.stateMachine.complete(isLast); handler.onResult(result); } else { write(); @@ -915,4 +936,103 @@ public abstract class WsRemoteEndpointIm return encoder; } } + + + private static enum State { + OPEN, + STREAM_WRITING, + WRITER_WRITING, + BINARY_PARTIAL_WRITING, + BINARY_PARTIAL_READY, + BINARY_FULL_WRITING, + TEXT_PARTIAL_WRITING, + TEXT_PARTIAL_READY, + TEXT_FULL_WRITING + } + + + private static class StateMachine { + private State state = State.OPEN; + + public synchronized void streamStart() { + checkState(State.OPEN); + state = State.STREAM_WRITING; + } + + public synchronized void writeStart() { + checkState(State.OPEN); + state = State.WRITER_WRITING; + } + + public synchronized void binaryPartialStart() { + checkState(State.OPEN, State.BINARY_PARTIAL_READY); + state = State.BINARY_PARTIAL_WRITING; + } + + public synchronized void binaryStart() { + checkState(State.OPEN); + state = State.BINARY_FULL_WRITING; + } + + public synchronized void textPartialStart() { + checkState(State.OPEN, State.TEXT_PARTIAL_READY); + state = State.TEXT_PARTIAL_WRITING; + } + + public synchronized void textStart() { + checkState(State.OPEN); + state = State.TEXT_FULL_WRITING; + } + + public synchronized void complete(boolean last) { + if (last) { + checkState(State.TEXT_PARTIAL_WRITING, State.TEXT_FULL_WRITING, + State.BINARY_PARTIAL_WRITING, State.BINARY_FULL_WRITING, + State.STREAM_WRITING, State.WRITER_WRITING); + state = State.OPEN; + } else { + checkState(State.TEXT_PARTIAL_WRITING, State.BINARY_PARTIAL_WRITING, + State.STREAM_WRITING, State.WRITER_WRITING); + if (state == State.TEXT_PARTIAL_WRITING) { + state = State.TEXT_PARTIAL_READY; + } else if (state == State.BINARY_PARTIAL_WRITING){ + state = State.BINARY_PARTIAL_READY; + } else if (state == State.WRITER_WRITING) { + // NO-OP. Leave state as is. + } else if (state == State.STREAM_WRITING) { + // NO-OP. Leave state as is. + } else { + // Should never happen + // TODO Better message + throw new IllegalStateException(); + } + } + } + + private void checkState(State... required) { + for (State state : required) { + if (this.state == state) { + return; + } + } + // TODO Better (well, any) message + throw new IllegalStateException(); + } + } + + + private class StateUpdateSendHandler implements SendHandler { + + private final SendHandler handler; + + public StateUpdateSendHandler(SendHandler handler) { + this.handler = handler; + } + + @Override + public void onResult(SendResult result) { + stateMachine.complete(true); + handler.onResult(result); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org