Author: markt
Date: Thu Feb 23 00:40:02 2012
New Revision: 1292598

URL: http://svn.apache.org/viewvc?rev=1292598&view=rev
Log:
Refactor

Added:
    tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
      - copied, changed from r1292498, 
tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
Removed:
    tomcat/trunk/java/org/apache/catalina/websocket/WsFrameHeader.java
Modified:
    tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
    tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
    tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java

Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Thu Feb 
23 00:40:02 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
-import java.nio.ByteBuffer;
 
 import org.apache.coyote.http11.upgrade.UpgradeInbound;
 import org.apache.coyote.http11.upgrade.UpgradeOutbound;
@@ -55,15 +54,15 @@ public abstract class StreamInbound impl
         try {
             WsInputStream wsIs = new WsInputStream(processor);
 
-            WsFrameHeader header = wsIs.getFrameHeader();
+            WsFrame frame = wsIs.getFrame();
 
             // TODO User defined extensions may define values for rsv
-            if (header.getRsv() > 0) {
+            if (frame.getRsv() > 0) {
                 getOutbound().close(1002, null);
                 return SocketState.CLOSED;
             }
 
-            byte opCode = header.getOpCode();
+            byte opCode = frame.getOpCode();
 
             if (opCode == Constants.OPCODE_BINARY) {
                 onBinaryData(wsIs);
@@ -75,22 +74,14 @@ public abstract class StreamInbound impl
                 return SocketState.UPGRADED;
             }
 
-            // Must be a control frame and control frames:
-            // - have a limited payload length
-            // - must not be fragmented
-            if (wsIs.getPayloadLength() > 125 || 
!wsIs.getFrameHeader().getFin()) {
-                getOutbound().close(1002, null);
-                return SocketState.CLOSED;
-            }
-
             if (opCode == Constants.OPCODE_CLOSE){
-                doClose(wsIs);
+                doClose(frame);
                 return SocketState.CLOSED;
             } else if (opCode == Constants.OPCODE_PING) {
-                doPing(wsIs);
+                doPing(frame);
                 return SocketState.UPGRADED;
             } else if (opCode == Constants.OPCODE_PONG) {
-                doPong(wsIs);
+                // NO-OP
                 return SocketState.UPGRADED;
             }
 
@@ -105,70 +96,23 @@ public abstract class StreamInbound impl
         }
     }
 
-    private void doClose(WsInputStream is) throws IOException {
-        // Control messages have a max size of 125 bytes. Need to try and read
-        // one more so we reach end of stream (less 2 for the status). Note 
that
-        // the 125 byte limit is enforced in #onData() before this method is
-        // ever called.
-        ByteBuffer data = null;
-
-        int status = is.read();
-        if (status != -1) {
-            status = status << 8;
-            int i = is.read();
-            if (i == -1) {
-                // EOF during middle of close message. Closing anyway but set
-                // close code to protocol error
-                status = 1002;
-            } else {
-                status = status + i;
-                if (is.getPayloadLength() > 2) {
-                    data = ByteBuffer.allocate((int) is.getPayloadLength() - 
1);
-                    int read = 0;
-                    while (read > -1) {
-                        data.position(data.position() + read);
-                        read = is.read(data.array(), data.position(),
-                                data.remaining());
-                    }
-                    data.flip();
-                }
-            }
+    private void doClose(WsFrame frame) throws IOException {
+        if (frame.getPayLoadLength() > 0) {
+            // Must be status (2 bytes) plus optional message
+            if (frame.getPayLoadLength() == 1) {
+                throw new IOException();
+            }
+            int status = (frame.getPayLoad().get() & 0xFF) << 8;
+            status += frame.getPayLoad().get() & 0xFF;
+            getOutbound().close(status, frame.getPayLoad());
         } else {
-            status = 0;
-        }
-        getOutbound().close(status, data);
-    }
-
-    private void doPing(WsInputStream is) throws IOException {
-        // Control messages have a max size of 125 bytes. Need to try and read
-        // one more so we reach end of stream. Note that the 125 byte limit is
-        // enforced in #onData() before this method is ever called.
-        ByteBuffer data = null;
-
-        if (is.getPayloadLength() > 0) {
-            data = ByteBuffer.allocate((int) is.getPayloadLength() + 1);
-
-            int read = 0;
-            while (read > -1) {
-                data.position(data.position() + read);
-                read = is.read(data.array(), data.position(), 
data.remaining());
-            }
-
-            data.flip();
+            // No status
+            getOutbound().close(0, null);
         }
-
-        getOutbound().pong(data);
     }
 
-    private void doPong(WsInputStream is) throws IOException {
-        // Unsolicited pong - swallow it
-        // Control messages have a max size of 125 bytes. Note that the 125 
byte
-        // limit is enforced in #onData() before this method is ever called so
-        // the loop below is not unbounded.
-        int read = 0;
-        while (read > -1) {
-            read = is.read();
-        }
+    private void doPing(WsFrame frame) throws IOException {
+        getOutbound().pong(frame.getPayLoad());
     }
 
     protected abstract void onBinaryData(InputStream is) throws IOException;

Copied: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (from 
r1292498, tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java)
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?p2=tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java&p1=tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java&r1=1292498&r2=1292598&rev=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Thu Feb 23 
00:40:02 2012
@@ -17,80 +17,102 @@
 package org.apache.catalina.websocket;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.catalina.util.Conversions;
 import org.apache.coyote.http11.upgrade.UpgradeProcessor;
 
 /**
- * This class is used to read WebSocket frames from the underlying socket and
- * makes the payload available for reading as an {@link InputStream}. It only
- * makes the number of bytes declared in the payload length available for
- * reading even if more bytes are available from the socket.
+ * Represents a WebSocket frame with the exception of the payload for
+ * non-control frames.
  */
-public class WsInputStream extends java.io.InputStream {
+public class WsFrame {
 
-    private UpgradeProcessor<?> processor;
-    private WsFrameHeader wsFrameHeader;
-    private long payloadLength = -1;
+    private final boolean fin;
+    private final int rsv;
+    private final byte opCode;
     private int[] mask = new int[4];
+    private long payloadLength;
+    private ByteBuffer payload;
 
+    public WsFrame(UpgradeProcessor<?> processor) throws IOException {
 
-    private long remaining;
-    private long readThisFragment;
-
-    public WsInputStream(UpgradeProcessor<?> processor) throws IOException {
-        this.processor = processor;
-
-        processFrameHeader();
-    }
-
-
-    private void processFrameHeader() throws IOException {
-
-        // TODO: Per frame extension handling is not currently supported.
-
-        // TODO: Handle control frames between fragments
-
-        this.wsFrameHeader = new WsFrameHeader(processorRead());
+        int b = processorRead(processor);
+        fin = (b & 0x80) > 0;
+        rsv = (b & 0x70) >>> 4;
+        opCode = (byte) (b & 0x0F);
 
+        b = processorRead(processor);
         // Client data must be masked
-        int i = processorRead();
-        if ((i & 0x80) == 0) {
+        if ((b & 0x80) == 0) {
             // TODO: StringManager / i18n
             throw new IOException("Client frame not masked");
         }
 
-        payloadLength = i & 0x7F;
+        payloadLength = b & 0x7F;
         if (payloadLength == 126) {
             byte[] extended = new byte[2];
-            processorRead(extended);
+            processorRead(processor, extended);
             payloadLength = Conversions.byteArrayToLong(extended);
         } else if (payloadLength == 127) {
             byte[] extended = new byte[8];
-            processorRead(extended);
+            processorRead(processor, extended);
             payloadLength = Conversions.byteArrayToLong(extended);
         }
-        remaining = payloadLength;
+
+        boolean isControl = (opCode & 0x08) > 0;
+
+        if (isControl) {
+            if (payloadLength > 125) {
+                throw new IOException();
+            }
+            if (!fin) {
+                throw new IOException();
+            }
+        }
 
         for (int j = 0; j < mask.length; j++) {
-            mask[j] = processor.read() & 0xFF;
+            mask[j] = processorRead(processor) & 0xFF;
+        }
+
+        if (isControl) {
+            // Note: Payload limited to <= 125 bytes by test above
+            payload = ByteBuffer.allocate((int) payloadLength);
+            processorRead(processor, payload);
+        } else {
+            payload = null;
         }
+    }
 
-        readThisFragment = 0;
+    public boolean getFin() {
+        return fin;
     }
 
-    public WsFrameHeader getFrameHeader() {
-        return wsFrameHeader;
+    public int getRsv() {
+        return rsv;
     }
 
-    public long getPayloadLength() {
+    public byte getOpCode() {
+        return opCode;
+    }
+
+    public int[] getMask() {
+        return mask;
+    }
+
+    public long getPayLoadLength() {
         return payloadLength;
     }
 
+    public ByteBuffer getPayLoad() {
+        return payload;
+    }
+
 
     // ----------------------------------- Guaranteed read methods for 
processor
 
-    private int processorRead() throws IOException {
+    private int processorRead(UpgradeProcessor<?> processor)
+            throws IOException {
         int result = processor.read();
         if (result == -1) {
             // TODO i18n
@@ -100,7 +122,8 @@ public class WsInputStream extends java.
     }
 
 
-    private void processorRead(byte[] bytes) throws IOException {
+    private void processorRead(UpgradeProcessor<?> processor, byte[] bytes)
+            throws IOException {
         int read = 0;
         int last = 0;
         while (read < bytes.length) {
@@ -113,31 +136,21 @@ public class WsInputStream extends java.
         }
     }
 
-    // ----------------------------------------------------- InputStream 
methods
 
-    @Override
-    public int read() throws IOException {
-        while (remaining == 0 && !getFrameHeader().getFin()) {
-            // Need more data - process next frame
-            processFrameHeader();
-
-            if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION) 
{
+    /*
+     * Intended to read whole payload. Therefore able to unmask.
+     */
+    private void processorRead(UpgradeProcessor<?> processor, ByteBuffer bb)
+            throws IOException {
+        int last = 0;
+        while (bb.hasRemaining()) {
+            last = processor.read();
+            if (last == -1) {
                 // TODO i18n
-                throw new IOException("Not a continuation frame");
+                throw new IOException("End of stream before end of frame");
             }
+            bb.put((byte) (last ^ mask[bb.position() % 4]));
         }
-
-        if (remaining == 0) {
-            return -1;
-        }
-
-        remaining--;
-        readThisFragment++;
-
-        int masked = processor.read();
-        if(masked == -1) {
-            return -1;
-        }
-        return masked ^ mask[(int) ((readThisFragment - 1) % 4)];
+        bb.flip();
     }
 }

Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Thu Feb 
23 00:40:02 2012
@@ -18,7 +18,6 @@ package org.apache.catalina.websocket;
 
 import java.io.IOException;
 
-import org.apache.catalina.util.Conversions;
 import org.apache.coyote.http11.upgrade.UpgradeProcessor;
 
 /**
@@ -30,98 +29,36 @@ import org.apache.coyote.http11.upgrade.
 public class WsInputStream extends java.io.InputStream {
 
     private UpgradeProcessor<?> processor;
-    private WsFrameHeader wsFrameHeader;
-    private long payloadLength = -1;
-    private int[] mask = new int[4];
-
+    private WsFrame frame;
 
     private long remaining;
     private long readThisFragment;
 
     public WsInputStream(UpgradeProcessor<?> processor) throws IOException {
         this.processor = processor;
-
-        processFrameHeader();
+        processFrame();
     }
 
-
-    private void processFrameHeader() throws IOException {
-
-        // TODO: Per frame extension handling is not currently supported.
-
-        // TODO: Handle control frames between fragments
-
-        this.wsFrameHeader = new WsFrameHeader(processorRead());
-
-        // Client data must be masked
-        int i = processorRead();
-        if ((i & 0x80) == 0) {
-            // TODO: StringManager / i18n
-            throw new IOException("Client frame not masked");
-        }
-
-        payloadLength = i & 0x7F;
-        if (payloadLength == 126) {
-            byte[] extended = new byte[2];
-            processorRead(extended);
-            payloadLength = Conversions.byteArrayToLong(extended);
-        } else if (payloadLength == 127) {
-            byte[] extended = new byte[8];
-            processorRead(extended);
-            payloadLength = Conversions.byteArrayToLong(extended);
-        }
-        remaining = payloadLength;
-
-        for (int j = 0; j < mask.length; j++) {
-            mask[j] = processor.read() & 0xFF;
-        }
-
-        readThisFragment = 0;
+    public WsFrame getFrame() {
+        return frame;
     }
 
-    public WsFrameHeader getFrameHeader() {
-        return wsFrameHeader;
-    }
-
-    public long getPayloadLength() {
-        return payloadLength;
-    }
-
-
-    // ----------------------------------- Guaranteed read methods for 
processor
-
-    private int processorRead() throws IOException {
-        int result = processor.read();
-        if (result == -1) {
-            // TODO i18n
-            throw new IOException("End of stream before end of frame");
-        }
-        return result;
+    private void processFrame() throws IOException {
+        frame = new WsFrame(processor);
+        readThisFragment = 0;
+        remaining = frame.getPayLoadLength();
     }
 
 
-    private void processorRead(byte[] bytes) throws IOException {
-        int read = 0;
-        int last = 0;
-        while (read < bytes.length) {
-            last = processor.read(bytes, read, bytes.length - read);
-            if (last == -1) {
-                // TODO i18n
-                throw new IOException("End of stream before end of frame");
-            }
-            read += last;
-        }
-    }
-
     // ----------------------------------------------------- InputStream 
methods
 
     @Override
     public int read() throws IOException {
-        while (remaining == 0 && !getFrameHeader().getFin()) {
+        while (remaining == 0 && !getFrame().getFin()) {
             // Need more data - process next frame
-            processFrameHeader();
+            processFrame();
 
-            if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION) 
{
+            if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) {
                 // TODO i18n
                 throw new IOException("Not a continuation frame");
             }
@@ -138,6 +75,6 @@ public class WsInputStream extends java.
         if(masked == -1) {
             return -1;
         }
-        return masked ^ mask[(int) ((readThisFragment - 1) % 4)];
+        return masked ^ frame.getMask()[(int) ((readThisFragment - 1) % 4)];
     }
 }

Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java (original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java Thu Feb 23 
00:40:02 2012
@@ -126,15 +126,15 @@ public class WsOutbound {
         upgradeOutbound.write(0x88);
         if (status == 0) {
             upgradeOutbound.write(0);
-        } else if (data == null) {
+        } else if (data == null || data.position() == data.limit()) {
             upgradeOutbound.write(2);
             upgradeOutbound.write(status >>> 8);
             upgradeOutbound.write(status);
         } else {
-            upgradeOutbound.write(2 + data.limit());
+            upgradeOutbound.write(2 + data.limit() - data.position());
             upgradeOutbound.write(status >>> 8);
             upgradeOutbound.write(status);
-            upgradeOutbound.write(data.array(), 0, data.limit());
+            upgradeOutbound.write(data.array(), data.position(), data.limit());
         }
         upgradeOutbound.flush();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to