Author: markt Date: Thu Dec 13 22:56:06 2012 New Revision: 1421602 URL: http://svn.apache.org/viewvc?rev=1421602&view=rev Log: WebSocket 1.0 implementation part 15 of many Implement enough of the send message code that that WebSocket example works again using the new annotation endpoint
Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff ============================================================================== --- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original) +++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Thu Dec 13 22:56:06 2012 @@ -25,8 +25,18 @@ import java.util.concurrent.Future; public interface RemoteEndpoint { + /** + * Send the message, blocking until the message is sent. + * @param text The text message to send. + * @throws IOException + */ void sendString(String text) throws IOException; + /** + * Send the message, blocking until the message is sent. + * @param data The binary message to send + * @throws IOException + */ void sendBytes(ByteBuffer data) throws IOException; void sendPartialString(String fragment, boolean isLast) throws IOException; Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Thu Dec 13 22:56:06 2012 @@ -20,6 +20,12 @@ import java.io.IOException; import java.io.OutputStream; import java.io.Writer; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import javax.servlet.ServletOutputStream; @@ -31,38 +37,83 @@ import javax.websocket.SendResult; public class WsRemoteEndpoint implements RemoteEndpoint { private final ServletOutputStream sos; + // Max length for outgoing WebSocket frame header is 10 bytes + private final ByteBuffer header = ByteBuffer.allocate(10); + + private final ByteBuffer textToByte = ByteBuffer.allocate(8192); + private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder(); + private volatile Boolean isText = null; + private volatile CyclicBarrier writeBarrier = new CyclicBarrier(2); + public WsRemoteEndpoint(ServletOutputStream sos) { this.sos = sos; } - public void onWritePossible() { - // TODO - } @Override public void sendString(String text) throws IOException { - // TODO Auto-generated method stub + if (isText != null) { + // Another message is being sent using fragments + // TODO i18n + throw new IllegalStateException(); + } + sendPartialString(text, true); } @Override public void sendBytes(ByteBuffer data) throws IOException { - // TODO Auto-generated method stub + if (isText != null) { + // Another message is being sent using fragments + // TODO i18n + throw new IllegalStateException(); + } + sendPartialBytes(data, true); } @Override public void sendPartialString(String fragment, boolean isLast) throws IOException { - // TODO Auto-generated method stub + + if (isText != null && !isText.booleanValue()) { + // Can't write a text fragment in the middle of a binary message + // TODO i18n + throw new IllegalStateException(); + } + + boolean first = (isText == null); + encoder.reset(); + textToByte.clear(); + CharBuffer cb = CharBuffer.wrap(fragment); + CoderResult cr = encoder.encode(cb, textToByte, true); + while (cr.isOverflow()) { + sendMessage(Constants.OPCODE_TEXT, textToByte, first, false); + first = false; + } + sendMessage(Constants.OPCODE_TEXT, textToByte, first, isLast); + if (!isLast) { + isText = Boolean.FALSE; + } } @Override public void sendPartialBytes(ByteBuffer partialByte, boolean isLast) throws IOException { - // TODO Auto-generated method stub + + if (isText != null && isText.booleanValue()) { + // Can't write a binary fragment in the middle of a text message + // TODO i18n + throw new IllegalStateException(); + } + + boolean first = (isText == null); + sendMessage(Constants.OPCODE_BINARY, partialByte, first, isLast); + if (!isLast) { + isText = Boolean.FALSE; + } } @@ -127,12 +178,92 @@ public class WsRemoteEndpoint implements @Override public void sendPing(ByteBuffer applicationData) { - // TODO Auto-generated method stub + sendMessage(Constants.OPCODE_PING, applicationData, true, true); } @Override public void sendPong(ByteBuffer applicationData) { - // TODO Auto-generated method stub + sendMessage(Constants.OPCODE_PONG, applicationData, true, true); + } + + + public void onWritePossible() { + try { + writeBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + + private void sendMessage(byte opCode, ByteBuffer message, + boolean isFirstFragment, boolean isLastFragment) { + // Clear header, ready for new message + header.clear(); + byte first = 0; + + if (isLastFragment) { + // Set the fin bit + first = -128; + } + + if (isFirstFragment) { + // This is the first fragment of this message + first = (byte) (first + opCode); + } + // If not the first fragment, it is a continuation with opCode of zero + + message.flip(); + header.put(first); + + // Next write the length + if (message.limit() < 126) { + header.put((byte) message.limit()); + } else if (message.limit() < 65536) { + header.put((byte) 126); + header.put((byte) (message.limit() >>> 8)); + header.put((byte) (message.limit() & 0xFF)); + } else { + // Will never be more than 2^31-1 + header.put((byte) 127); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) (message.limit() >>> 24)); + header.put((byte) (message.limit() >>> 16)); + header.put((byte) (message.limit() >>> 8)); + header.put((byte) (message.limit() & 0xFF)); + } + header.flip(); + + doBlockingWrite(header); + doBlockingWrite(message); + try { + sos.flush(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + + private void doBlockingWrite(ByteBuffer data) { + if (!sos.canWrite()) { + try { + writeBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + try { + sos.write(data.array(), data.arrayOffset(), data.limit()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org