Author: markt
Date: Tue Feb 28 21:22:19 2012
New Revision: 1294839

URL: http://svn.apache.org/viewvc?rev=1294839&view=rev
Log:
Support non-blocking IO for WebSockets (between messages) if the
endpoint supports it.

Modified:
    tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
    tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
    tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java

Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Tue Feb 
28 21:22:19 2012
@@ -65,14 +65,11 @@ public abstract class StreamInbound impl
         // Must be start the start of a message (which may consist of multiple
         // frames)
 
-        // TODO - change this test to check if there is data to read
-        while (true) {
-            try {
-                // New WsInputStream for each message (not each frame)
-                WsInputStream wsIs =
-                        new WsInputStream(processor, getWsOutbound());
-                WsFrame frame = wsIs.getFrame();
+        WsInputStream wsIs = new WsInputStream(processor, getWsOutbound());
+        WsFrame frame = wsIs.nextFrame(true);
 
+        while (frame != null) {
+            try {
                 // TODO User defined extensions may define values for rsv
                 if (frame.getRsv() > 0) {
                     getWsOutbound().close(1002, null);
@@ -113,9 +110,9 @@ public abstract class StreamInbound impl
                 getWsOutbound().close(1002, null);
                 return SocketState.CLOSED;
             }
+            frame = wsIs.nextFrame(false);
         }
-        // TODO Required once while loop is fixed
-        // return SocketState.UPGRADED;
+        return SocketState.UPGRADED;
     }
 
 

Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Tue Feb 28 
21:22:19 2012
@@ -46,6 +46,7 @@ public class WsFrame {
      * Create the new WebSocket frame, reading data from the processor as
      * necessary.
      *
+     * @param first     First byte of data for this frame
      * @param processor Processor associated with the WebSocket connection on
      *                  which the frame has been sent
      *
@@ -53,9 +54,10 @@ public class WsFrame {
      *                      exception will trigger the closing of the WebSocket
      *                      connection.
      */
-    public WsFrame(UpgradeProcessor<?> processor) throws IOException {
+    private WsFrame(byte first,
+            UpgradeProcessor<?> processor) throws IOException {
 
-        int b = blockingRead(processor);
+        int b = first & 0xFF;
         fin = (b & 0x80) > 0;
         rsv = (b & 0x70) >>> 4;
         opCode = (byte) (b & 0x0F);
@@ -184,4 +186,37 @@ public class WsFrame {
         }
         bb.flip();
     }
+
+
+    /**
+     * Read the next WebSocket frame, reading data from the processor as
+     * necessary.
+     *
+     * @param processor Processor associated with the WebSocket connection on
+     *                  which the frame has been sent
+     *
+     * @param block Should this method block until a frame is presented if no
+     *              data is currently available to process. Note that is a
+     *              single byte is available, this method will block until the
+     *              complete frame (excluding payload for non-control frames) 
is
+     *              available.
+     *
+     * @throws IOException  If a problem occurs processing the frame. Any
+     *                      exception will trigger the closing of the WebSocket
+     *                      connection.
+     */
+    public static WsFrame nextFrame(UpgradeProcessor<?> processor,
+            boolean block) throws IOException {
+
+        byte[] first = new byte[1];
+        int read = processor.read(block, first, 0, 1);
+        if (read == 1) {
+            return new WsFrame(first[0], processor);
+        } else if (read == 0) {
+            return null;
+        } else {
+            // TODO message
+            throw new IOException();
+        }
+    }
 }

Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Tue Feb 
28 21:22:19 2012
@@ -43,26 +43,36 @@ public class WsInputStream extends java.
     private String error = null;
 
 
-    public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound)
-            throws IOException {
+    public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound) {
         this.processor = processor;
         this.outbound = outbound;
-        processFrame();
     }
 
 
-    public WsFrame getFrame() {
+    /**
+     * Process the next WebSocket frame.
+     *
+     * @param block Should this method block until a frame is presented if no
+     *              data is currently available to process. Note that is a
+     *              single byte is available, this method will block until the
+     *              complete frame (excluding payload for non-control frames) 
is
+     *              available.
+     *
+     * @return  The next frame to be processed or <code>null</code> if block is
+     *          <code>false</code> and there is no data to be processed.
+     *
+     * @throws IOException  If a problem occurs reading the data for the frame.
+     */
+    public WsFrame nextFrame(boolean block) throws IOException {
+        frame = WsFrame.nextFrame(processor, block);
+        if (frame != null) {
+            readThisFragment = 0;
+            remaining = frame.getPayLoadLength();
+        }
         return frame;
     }
 
 
-    private void processFrame() throws IOException {
-        frame = new WsFrame(processor);
-        readThisFragment = 0;
-        remaining = frame.getPayLoadLength();
-    }
-
-
     // ----------------------------------------------------- InputStream 
methods
 
     @Override
@@ -120,25 +130,25 @@ public class WsInputStream extends java.
         if (error != null) {
             throw new IOException(error);
         }
-        while (remaining == 0 && !getFrame().getFin()) {
+        while (remaining == 0 && !frame.getFin()) {
             // Need more data - process next frame
-            processFrame();
+            nextFrame(true);
             while (frame.isControl()) {
-                if (getFrame().getOpCode() == Constants.OPCODE_PING) {
+                if (frame.getOpCode() == Constants.OPCODE_PING) {
                     outbound.pong(frame.getPayLoad());
-                } else if (getFrame().getOpCode() == Constants.OPCODE_PONG) {
+                } else if (frame.getOpCode() == Constants.OPCODE_PONG) {
                     // NO-OP. Swallow it.
-                } else if (getFrame().getOpCode() == Constants.OPCODE_CLOSE) {
+                } else if (frame.getOpCode() == Constants.OPCODE_CLOSE) {
                     outbound.close(frame);
                 } else{
                     throw new IOException(sm.getString("is.unknownOpCode",
-                            Byte.valueOf(getFrame().getOpCode())));
+                            Byte.valueOf(frame.getOpCode())));
                 }
-                processFrame();
+                nextFrame(true);
             }
-            if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) {
+            if (frame.getOpCode() != Constants.OPCODE_CONTINUATION) {
                 error = sm.getString("is.notContinutation",
-                        Byte.valueOf(getFrame().getOpCode()));
+                        Byte.valueOf(frame.getOpCode()));
                 throw new IOException(error);
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to