Author: markt Date: Tue Feb 28 21:22:19 2012 New Revision: 1294839 URL: http://svn.apache.org/viewvc?rev=1294839&view=rev Log: Support non-blocking IO for WebSockets (between messages) if the endpoint supports it.
Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1294839&r1=1294838&r2=1294839&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Tue Feb 28 21:22:19 2012 @@ -65,14 +65,11 @@ public abstract class StreamInbound impl // Must be start the start of a message (which may consist of multiple // frames) - // TODO - change this test to check if there is data to read - while (true) { - try { - // New WsInputStream for each message (not each frame) - WsInputStream wsIs = - new WsInputStream(processor, getWsOutbound()); - WsFrame frame = wsIs.getFrame(); + WsInputStream wsIs = new WsInputStream(processor, getWsOutbound()); + WsFrame frame = wsIs.nextFrame(true); + while (frame != null) { + try { // TODO User defined extensions may define values for rsv if (frame.getRsv() > 0) { getWsOutbound().close(1002, null); @@ -113,9 +110,9 @@ public abstract class StreamInbound impl getWsOutbound().close(1002, null); return SocketState.CLOSED; } + frame = wsIs.nextFrame(false); } - // TODO Required once while loop is fixed - // return SocketState.UPGRADED; + return SocketState.UPGRADED; } Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?rev=1294839&r1=1294838&r2=1294839&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Tue Feb 28 21:22:19 2012 @@ -46,6 +46,7 @@ public class WsFrame { * Create the new WebSocket frame, reading data from the processor as * necessary. * + * @param first First byte of data for this frame * @param processor Processor associated with the WebSocket connection on * which the frame has been sent * @@ -53,9 +54,10 @@ public class WsFrame { * exception will trigger the closing of the WebSocket * connection. */ - public WsFrame(UpgradeProcessor<?> processor) throws IOException { + private WsFrame(byte first, + UpgradeProcessor<?> processor) throws IOException { - int b = blockingRead(processor); + int b = first & 0xFF; fin = (b & 0x80) > 0; rsv = (b & 0x70) >>> 4; opCode = (byte) (b & 0x0F); @@ -184,4 +186,37 @@ public class WsFrame { } bb.flip(); } + + + /** + * Read the next WebSocket frame, reading data from the processor as + * necessary. + * + * @param processor Processor associated with the WebSocket connection on + * which the frame has been sent + * + * @param block Should this method block until a frame is presented if no + * data is currently available to process. Note that is a + * single byte is available, this method will block until the + * complete frame (excluding payload for non-control frames) is + * available. + * + * @throws IOException If a problem occurs processing the frame. Any + * exception will trigger the closing of the WebSocket + * connection. + */ + public static WsFrame nextFrame(UpgradeProcessor<?> processor, + boolean block) throws IOException { + + byte[] first = new byte[1]; + int read = processor.read(block, first, 0, 1); + if (read == 1) { + return new WsFrame(first[0], processor); + } else if (read == 0) { + return null; + } else { + // TODO message + throw new IOException(); + } + } } Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1294839&r1=1294838&r2=1294839&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java (original) +++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Tue Feb 28 21:22:19 2012 @@ -43,26 +43,36 @@ public class WsInputStream extends java. private String error = null; - public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound) - throws IOException { + public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound) { this.processor = processor; this.outbound = outbound; - processFrame(); } - public WsFrame getFrame() { + /** + * Process the next WebSocket frame. + * + * @param block Should this method block until a frame is presented if no + * data is currently available to process. Note that is a + * single byte is available, this method will block until the + * complete frame (excluding payload for non-control frames) is + * available. + * + * @return The next frame to be processed or <code>null</code> if block is + * <code>false</code> and there is no data to be processed. + * + * @throws IOException If a problem occurs reading the data for the frame. + */ + public WsFrame nextFrame(boolean block) throws IOException { + frame = WsFrame.nextFrame(processor, block); + if (frame != null) { + readThisFragment = 0; + remaining = frame.getPayLoadLength(); + } return frame; } - private void processFrame() throws IOException { - frame = new WsFrame(processor); - readThisFragment = 0; - remaining = frame.getPayLoadLength(); - } - - // ----------------------------------------------------- InputStream methods @Override @@ -120,25 +130,25 @@ public class WsInputStream extends java. if (error != null) { throw new IOException(error); } - while (remaining == 0 && !getFrame().getFin()) { + while (remaining == 0 && !frame.getFin()) { // Need more data - process next frame - processFrame(); + nextFrame(true); while (frame.isControl()) { - if (getFrame().getOpCode() == Constants.OPCODE_PING) { + if (frame.getOpCode() == Constants.OPCODE_PING) { outbound.pong(frame.getPayLoad()); - } else if (getFrame().getOpCode() == Constants.OPCODE_PONG) { + } else if (frame.getOpCode() == Constants.OPCODE_PONG) { // NO-OP. Swallow it. - } else if (getFrame().getOpCode() == Constants.OPCODE_CLOSE) { + } else if (frame.getOpCode() == Constants.OPCODE_CLOSE) { outbound.close(frame); } else{ throw new IOException(sm.getString("is.unknownOpCode", - Byte.valueOf(getFrame().getOpCode()))); + Byte.valueOf(frame.getOpCode()))); } - processFrame(); + nextFrame(true); } - if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) { + if (frame.getOpCode() != Constants.OPCODE_CONTINUATION) { error = sm.getString("is.notContinutation", - Byte.valueOf(getFrame().getOpCode())); + Byte.valueOf(frame.getOpCode())); throw new IOException(error); } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org