Author: markt Date: Thu Feb 23 00:40:02 2012 New Revision: 1292598 URL: http://svn.apache.org/viewvc?rev=1292598&view=rev Log: Refactor
Added: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java - copied, changed from r1292498, tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Removed: tomcat/trunk/java/org/apache/catalina/websocket/WsFrameHeader.java Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Thu Feb 23 00:40:02 2012 @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; -import java.nio.ByteBuffer; import org.apache.coyote.http11.upgrade.UpgradeInbound; import org.apache.coyote.http11.upgrade.UpgradeOutbound; @@ -55,15 +54,15 @@ public abstract class StreamInbound impl try { WsInputStream wsIs = new WsInputStream(processor); - WsFrameHeader header = wsIs.getFrameHeader(); + WsFrame frame = wsIs.getFrame(); // TODO User defined extensions may define values for rsv - if (header.getRsv() > 0) { + if (frame.getRsv() > 0) { getOutbound().close(1002, null); return SocketState.CLOSED; } - byte opCode = header.getOpCode(); + byte opCode = frame.getOpCode(); if (opCode == Constants.OPCODE_BINARY) { onBinaryData(wsIs); @@ -75,22 +74,14 @@ public abstract class StreamInbound impl return SocketState.UPGRADED; } - // Must be a control frame and control frames: - // - have a limited payload length - // - must not be fragmented - if (wsIs.getPayloadLength() > 125 || !wsIs.getFrameHeader().getFin()) { - getOutbound().close(1002, null); - return SocketState.CLOSED; - } - if (opCode == Constants.OPCODE_CLOSE){ - doClose(wsIs); + doClose(frame); return SocketState.CLOSED; } else if (opCode == Constants.OPCODE_PING) { - doPing(wsIs); + doPing(frame); return SocketState.UPGRADED; } else if (opCode == Constants.OPCODE_PONG) { - doPong(wsIs); + // NO-OP return SocketState.UPGRADED; } @@ -105,70 +96,23 @@ public abstract class StreamInbound impl } } - private void doClose(WsInputStream is) throws IOException { - // Control messages have a max size of 125 bytes. Need to try and read - // one more so we reach end of stream (less 2 for the status). Note that - // the 125 byte limit is enforced in #onData() before this method is - // ever called. - ByteBuffer data = null; - - int status = is.read(); - if (status != -1) { - status = status << 8; - int i = is.read(); - if (i == -1) { - // EOF during middle of close message. Closing anyway but set - // close code to protocol error - status = 1002; - } else { - status = status + i; - if (is.getPayloadLength() > 2) { - data = ByteBuffer.allocate((int) is.getPayloadLength() - 1); - int read = 0; - while (read > -1) { - data.position(data.position() + read); - read = is.read(data.array(), data.position(), - data.remaining()); - } - data.flip(); - } - } + private void doClose(WsFrame frame) throws IOException { + if (frame.getPayLoadLength() > 0) { + // Must be status (2 bytes) plus optional message + if (frame.getPayLoadLength() == 1) { + throw new IOException(); + } + int status = (frame.getPayLoad().get() & 0xFF) << 8; + status += frame.getPayLoad().get() & 0xFF; + getOutbound().close(status, frame.getPayLoad()); } else { - status = 0; - } - getOutbound().close(status, data); - } - - private void doPing(WsInputStream is) throws IOException { - // Control messages have a max size of 125 bytes. Need to try and read - // one more so we reach end of stream. Note that the 125 byte limit is - // enforced in #onData() before this method is ever called. - ByteBuffer data = null; - - if (is.getPayloadLength() > 0) { - data = ByteBuffer.allocate((int) is.getPayloadLength() + 1); - - int read = 0; - while (read > -1) { - data.position(data.position() + read); - read = is.read(data.array(), data.position(), data.remaining()); - } - - data.flip(); + // No status + getOutbound().close(0, null); } - - getOutbound().pong(data); } - private void doPong(WsInputStream is) throws IOException { - // Unsolicited pong - swallow it - // Control messages have a max size of 125 bytes. Note that the 125 byte - // limit is enforced in #onData() before this method is ever called so - // the loop below is not unbounded. - int read = 0; - while (read > -1) { - read = is.read(); - } + private void doPing(WsFrame frame) throws IOException { + getOutbound().pong(frame.getPayLoad()); } protected abstract void onBinaryData(InputStream is) throws IOException; Copied: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (from r1292498, tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java) URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?p2=tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java&p1=tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java&r1=1292498&r2=1292598&rev=1292598&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Thu Feb 23 00:40:02 2012 @@ -17,80 +17,102 @@ package org.apache.catalina.websocket; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.catalina.util.Conversions; import org.apache.coyote.http11.upgrade.UpgradeProcessor; /** - * This class is used to read WebSocket frames from the underlying socket and - * makes the payload available for reading as an {@link InputStream}. It only - * makes the number of bytes declared in the payload length available for - * reading even if more bytes are available from the socket. + * Represents a WebSocket frame with the exception of the payload for + * non-control frames. */ -public class WsInputStream extends java.io.InputStream { +public class WsFrame { - private UpgradeProcessor<?> processor; - private WsFrameHeader wsFrameHeader; - private long payloadLength = -1; + private final boolean fin; + private final int rsv; + private final byte opCode; private int[] mask = new int[4]; + private long payloadLength; + private ByteBuffer payload; + public WsFrame(UpgradeProcessor<?> processor) throws IOException { - private long remaining; - private long readThisFragment; - - public WsInputStream(UpgradeProcessor<?> processor) throws IOException { - this.processor = processor; - - processFrameHeader(); - } - - - private void processFrameHeader() throws IOException { - - // TODO: Per frame extension handling is not currently supported. - - // TODO: Handle control frames between fragments - - this.wsFrameHeader = new WsFrameHeader(processorRead()); + int b = processorRead(processor); + fin = (b & 0x80) > 0; + rsv = (b & 0x70) >>> 4; + opCode = (byte) (b & 0x0F); + b = processorRead(processor); // Client data must be masked - int i = processorRead(); - if ((i & 0x80) == 0) { + if ((b & 0x80) == 0) { // TODO: StringManager / i18n throw new IOException("Client frame not masked"); } - payloadLength = i & 0x7F; + payloadLength = b & 0x7F; if (payloadLength == 126) { byte[] extended = new byte[2]; - processorRead(extended); + processorRead(processor, extended); payloadLength = Conversions.byteArrayToLong(extended); } else if (payloadLength == 127) { byte[] extended = new byte[8]; - processorRead(extended); + processorRead(processor, extended); payloadLength = Conversions.byteArrayToLong(extended); } - remaining = payloadLength; + + boolean isControl = (opCode & 0x08) > 0; + + if (isControl) { + if (payloadLength > 125) { + throw new IOException(); + } + if (!fin) { + throw new IOException(); + } + } for (int j = 0; j < mask.length; j++) { - mask[j] = processor.read() & 0xFF; + mask[j] = processorRead(processor) & 0xFF; + } + + if (isControl) { + // Note: Payload limited to <= 125 bytes by test above + payload = ByteBuffer.allocate((int) payloadLength); + processorRead(processor, payload); + } else { + payload = null; } + } - readThisFragment = 0; + public boolean getFin() { + return fin; } - public WsFrameHeader getFrameHeader() { - return wsFrameHeader; + public int getRsv() { + return rsv; } - public long getPayloadLength() { + public byte getOpCode() { + return opCode; + } + + public int[] getMask() { + return mask; + } + + public long getPayLoadLength() { return payloadLength; } + public ByteBuffer getPayLoad() { + return payload; + } + // ----------------------------------- Guaranteed read methods for processor - private int processorRead() throws IOException { + private int processorRead(UpgradeProcessor<?> processor) + throws IOException { int result = processor.read(); if (result == -1) { // TODO i18n @@ -100,7 +122,8 @@ public class WsInputStream extends java. } - private void processorRead(byte[] bytes) throws IOException { + private void processorRead(UpgradeProcessor<?> processor, byte[] bytes) + throws IOException { int read = 0; int last = 0; while (read < bytes.length) { @@ -113,31 +136,21 @@ public class WsInputStream extends java. } } - // ----------------------------------------------------- InputStream methods - @Override - public int read() throws IOException { - while (remaining == 0 && !getFrameHeader().getFin()) { - // Need more data - process next frame - processFrameHeader(); - - if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION) { + /* + * Intended to read whole payload. Therefore able to unmask. + */ + private void processorRead(UpgradeProcessor<?> processor, ByteBuffer bb) + throws IOException { + int last = 0; + while (bb.hasRemaining()) { + last = processor.read(); + if (last == -1) { // TODO i18n - throw new IOException("Not a continuation frame"); + throw new IOException("End of stream before end of frame"); } + bb.put((byte) (last ^ mask[bb.position() % 4])); } - - if (remaining == 0) { - return -1; - } - - remaining--; - readThisFragment++; - - int masked = processor.read(); - if(masked == -1) { - return -1; - } - return masked ^ mask[(int) ((readThisFragment - 1) % 4)]; + bb.flip(); } } Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1292598&r1=1292597&r2=1292598&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Thu Feb 23 00:40:02 2012 @@ -18,7 +18,6 @@ package org.apache.catalina.websocket; import java.io.IOException; -import org.apache.catalina.util.Conversions; import org.apache.coyote.http11.upgrade.UpgradeProcessor; /** @@ -30,98 +29,36 @@ import org.apache.coyote.http11.upgrade. public class WsInputStream extends java.io.InputStream { private UpgradeProcessor<?> processor; - private WsFrameHeader wsFrameHeader; - private long payloadLength = -1; - private int[] mask = new int[4]; - + private WsFrame frame; private long remaining; private long readThisFragment; public WsInputStream(UpgradeProcessor<?> processor) throws IOException { this.processor = processor; - - processFrameHeader(); + processFrame(); } - - private void processFrameHeader() throws IOException { - - // TODO: Per frame extension handling is not currently supported. - - // TODO: Handle control frames between fragments - - this.wsFrameHeader = new WsFrameHeader(processorRead()); - - // Client data must be masked - int i = processorRead(); - if ((i & 0x80) == 0) { - // TODO: StringManager / i18n - throw new IOException("Client frame not masked"); - } - - payloadLength = i & 0x7F; - if (payloadLength == 126) { - byte[] extended = new byte[2]; - processorRead(extended); - payloadLength = Conversions.byteArrayToLong(extended); - } else if (payloadLength == 127) { - byte[] extended = new byte[8]; - processorRead(extended); - payloadLength = Conversions.byteArrayToLong(extended); - } - remaining = payloadLength; - - for (int j = 0; j < mask.length; j++) { - mask[j] = processor.read() & 0xFF; - } - - readThisFragment = 0; + public WsFrame getFrame() { + return frame; } - public WsFrameHeader getFrameHeader() { - return wsFrameHeader; - } - - public long getPayloadLength() { - return payloadLength; - } - - - // ----------------------------------- Guaranteed read methods for processor - - private int processorRead() throws IOException { - int result = processor.read(); - if (result == -1) { - // TODO i18n - throw new IOException("End of stream before end of frame"); - } - return result; + private void processFrame() throws IOException { + frame = new WsFrame(processor); + readThisFragment = 0; + remaining = frame.getPayLoadLength(); } - private void processorRead(byte[] bytes) throws IOException { - int read = 0; - int last = 0; - while (read < bytes.length) { - last = processor.read(bytes, read, bytes.length - read); - if (last == -1) { - // TODO i18n - throw new IOException("End of stream before end of frame"); - } - read += last; - } - } - // ----------------------------------------------------- InputStream methods @Override public int read() throws IOException { - while (remaining == 0 && !getFrameHeader().getFin()) { + while (remaining == 0 && !getFrame().getFin()) { // Need more data - process next frame - processFrameHeader(); + processFrame(); - if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION) { + if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) { // TODO i18n throw new IOException("Not a continuation frame"); } @@ -138,6 +75,6 @@ public class WsInputStream extends java. if(masked == -1) { return -1; } - return masked ^ mask[(int) ((readThisFragment - 1) % 4)]; + return masked ^ frame.getMask()[(int) ((readThisFragment - 1) % 4)]; } } Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java Thu Feb 23 00:40:02 2012 @@ -126,15 +126,15 @@ public class WsOutbound { upgradeOutbound.write(0x88); if (status == 0) { upgradeOutbound.write(0); - } else if (data == null) { + } else if (data == null || data.position() == data.limit()) { upgradeOutbound.write(2); upgradeOutbound.write(status >>> 8); upgradeOutbound.write(status); } else { - upgradeOutbound.write(2 + data.limit()); + upgradeOutbound.write(2 + data.limit() - data.position()); upgradeOutbound.write(status >>> 8); upgradeOutbound.write(status); - upgradeOutbound.write(data.array(), 0, data.limit()); + upgradeOutbound.write(data.array(), data.position(), data.limit()); } upgradeOutbound.flush(); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org