Author: markt
Date: Tue Feb 28 21:22:19 2012
New Revision: 1294839
URL: http://svn.apache.org/viewvc?rev=1294839&view=rev
Log:
Support non-blocking IO for WebSockets (between messages) if the
endpoint supports it.
Modified:
tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Tue Feb
28 21:22:19 2012
@@ -65,14 +65,11 @@ public abstract class StreamInbound impl
// Must be start the start of a message (which may consist of multiple
// frames)
- // TODO - change this test to check if there is data to read
- while (true) {
- try {
- // New WsInputStream for each message (not each frame)
- WsInputStream wsIs =
- new WsInputStream(processor, getWsOutbound());
- WsFrame frame = wsIs.getFrame();
+ WsInputStream wsIs = new WsInputStream(processor, getWsOutbound());
+ WsFrame frame = wsIs.nextFrame(true);
+ while (frame != null) {
+ try {
// TODO User defined extensions may define values for rsv
if (frame.getRsv() > 0) {
getWsOutbound().close(1002, null);
@@ -113,9 +110,9 @@ public abstract class StreamInbound impl
getWsOutbound().close(1002, null);
return SocketState.CLOSED;
}
+ frame = wsIs.nextFrame(false);
}
- // TODO Required once while loop is fixed
- // return SocketState.UPGRADED;
+ return SocketState.UPGRADED;
}
Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Tue Feb 28
21:22:19 2012
@@ -46,6 +46,7 @@ public class WsFrame {
* Create the new WebSocket frame, reading data from the processor as
* necessary.
*
+ * @param first First byte of data for this frame
* @param processor Processor associated with the WebSocket connection on
* which the frame has been sent
*
@@ -53,9 +54,10 @@ public class WsFrame {
* exception will trigger the closing of the WebSocket
* connection.
*/
- public WsFrame(UpgradeProcessor<?> processor) throws IOException {
+ private WsFrame(byte first,
+ UpgradeProcessor<?> processor) throws IOException {
- int b = blockingRead(processor);
+ int b = first & 0xFF;
fin = (b & 0x80) > 0;
rsv = (b & 0x70) >>> 4;
opCode = (byte) (b & 0x0F);
@@ -184,4 +186,37 @@ public class WsFrame {
}
bb.flip();
}
+
+
+ /**
+ * Read the next WebSocket frame, reading data from the processor as
+ * necessary.
+ *
+ * @param processor Processor associated with the WebSocket connection on
+ * which the frame has been sent
+ *
+ * @param block Should this method block until a frame is presented if no
+ * data is currently available to process. Note that is a
+ * single byte is available, this method will block until the
+ * complete frame (excluding payload for non-control frames)
is
+ * available.
+ *
+ * @throws IOException If a problem occurs processing the frame. Any
+ * exception will trigger the closing of the WebSocket
+ * connection.
+ */
+ public static WsFrame nextFrame(UpgradeProcessor<?> processor,
+ boolean block) throws IOException {
+
+ byte[] first = new byte[1];
+ int read = processor.read(block, first, 0, 1);
+ if (read == 1) {
+ return new WsFrame(first[0], processor);
+ } else if (read == 0) {
+ return null;
+ } else {
+ // TODO message
+ throw new IOException();
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1294839&r1=1294838&r2=1294839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Tue Feb
28 21:22:19 2012
@@ -43,26 +43,36 @@ public class WsInputStream extends java.
private String error = null;
- public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound)
- throws IOException {
+ public WsInputStream(UpgradeProcessor<?> processor, WsOutbound outbound) {
this.processor = processor;
this.outbound = outbound;
- processFrame();
}
- public WsFrame getFrame() {
+ /**
+ * Process the next WebSocket frame.
+ *
+ * @param block Should this method block until a frame is presented if no
+ * data is currently available to process. Note that is a
+ * single byte is available, this method will block until the
+ * complete frame (excluding payload for non-control frames)
is
+ * available.
+ *
+ * @return The next frame to be processed or <code>null</code> if block is
+ * <code>false</code> and there is no data to be processed.
+ *
+ * @throws IOException If a problem occurs reading the data for the frame.
+ */
+ public WsFrame nextFrame(boolean block) throws IOException {
+ frame = WsFrame.nextFrame(processor, block);
+ if (frame != null) {
+ readThisFragment = 0;
+ remaining = frame.getPayLoadLength();
+ }
return frame;
}
- private void processFrame() throws IOException {
- frame = new WsFrame(processor);
- readThisFragment = 0;
- remaining = frame.getPayLoadLength();
- }
-
-
// ----------------------------------------------------- InputStream
methods
@Override
@@ -120,25 +130,25 @@ public class WsInputStream extends java.
if (error != null) {
throw new IOException(error);
}
- while (remaining == 0 && !getFrame().getFin()) {
+ while (remaining == 0 && !frame.getFin()) {
// Need more data - process next frame
- processFrame();
+ nextFrame(true);
while (frame.isControl()) {
- if (getFrame().getOpCode() == Constants.OPCODE_PING) {
+ if (frame.getOpCode() == Constants.OPCODE_PING) {
outbound.pong(frame.getPayLoad());
- } else if (getFrame().getOpCode() == Constants.OPCODE_PONG) {
+ } else if (frame.getOpCode() == Constants.OPCODE_PONG) {
// NO-OP. Swallow it.
- } else if (getFrame().getOpCode() == Constants.OPCODE_CLOSE) {
+ } else if (frame.getOpCode() == Constants.OPCODE_CLOSE) {
outbound.close(frame);
} else{
throw new IOException(sm.getString("is.unknownOpCode",
- Byte.valueOf(getFrame().getOpCode())));
+ Byte.valueOf(frame.getOpCode())));
}
- processFrame();
+ nextFrame(true);
}
- if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) {
+ if (frame.getOpCode() != Constants.OPCODE_CONTINUATION) {
error = sm.getString("is.notContinutation",
- Byte.valueOf(getFrame().getOpCode()));
+ Byte.valueOf(frame.getOpCode()));
throw new IOException(error);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]