Author: markt Date: Fri May 29 08:48:56 2015 New Revision: 1682395 URL: http://svn.apache.org/r1682395 Log: Plumb in basic support for reading request bodies.
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties tomcat/trunk/java/org/apache/coyote/http2/Stream.java Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1682395&r1=1682394&r2=1682395&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri May 29 08:48:56 2015 @@ -275,6 +275,9 @@ public class Http2UpgradeHandler extends int payloadSize = getPayloadSize(streamId, frameHeader); switch (frameType) { + case FRAME_TYPE_DATA: + processFrameData(flags, streamId, payloadSize); + break; case FRAME_TYPE_HEADERS: processFrameHeaders(flags, streamId, payloadSize); break; @@ -298,6 +301,46 @@ public class Http2UpgradeHandler extends } + private void processFrameData(int flags, int streamId, int payloadSize) throws IOException { + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.processFrame", + Long.toString(connectionId), Integer.toString(streamId), + Integer.toString(flags), Integer.toString(payloadSize))); + } + + // Validate the stream + if (streamId == 0) { + throw new Http2Exception(sm.getString("upgradeHandler.processFrameData.invalidStream"), + 0, Http2Exception.PROTOCOL_ERROR); + } + + // Process the Stream + // TODO Handle end of stream flag + int padLength = 0; + + boolean endOfStream = (flags & 0x01) > 0; + boolean padding = (flags & 0x08) > 0; + + if (padding) { + byte[] b = new byte[1]; + readFully(b); + padLength = b[0] & 0xFF; + } + + Stream stream = getStream(streamId); + ByteBuffer dest = stream.getInputByteBuffer(); + synchronized (dest) { + readFully(dest, payloadSize); + if (endOfStream) { + stream.setEndOfStream(); + } + dest.notifyAll(); + } + + swallow(padLength); + } + + private void processFrameHeaders(int flags, int streamId, int payloadSize) throws IOException { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.processFrame", @@ -373,9 +416,7 @@ public class Http2UpgradeHandler extends 0, Http2Exception.PROTOCOL_ERROR); } - if (padLength > 0) { - swallow(padLength); - } + swallow(padLength); // Process this stream on a container thread StreamProcessor streamProcessor = new StreamProcessor(stream, adapter, socketWrapper); @@ -557,6 +598,9 @@ public class Http2UpgradeHandler extends private void swallow(int len) throws IOException { + if (len == 0) { + return; + } int read = 0; byte[] buffer = new byte[1024]; while (read < len) { Modified: tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties?rev=1682395&r1=1682394&r2=1682395&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties Fri May 29 08:48:56 2015 @@ -41,6 +41,7 @@ upgradeHandler.init=Connection [{0}] upgradeHandler.ioerror=Connection [{0}] upgradeHandler.payloadTooBig=The payload is [{0}] bytes long but the maximum frame size is [{1}] upgradeHandler.processFrame=Connection [{0}], Stream [{1}], Flags [{2}], Payload size [{3}] +upgradeHandler.processFrameData.invalidStream=Data frame received for stream [0] upgradeHandler.processFrameHeaders.invalidStream=Headers frame received for stream [0] upgradeHandler.processFrameHeaders.decodingFailed=There was an error during the HPACK decoding of HTTP headers upgradeHandler.processFrameHeaders.decodingDataLeft=Data left over after HPACK decoding - it should have been consumed Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1682395&r1=1682394&r2=1682395&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri May 29 08:48:56 2015 @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import org.apache.coyote.InputBuffer; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Request; import org.apache.coyote.Response; @@ -39,6 +40,7 @@ public class Stream extends AbstractStre private final Http2UpgradeHandler handler; private final Request coyoteRequest = new Request(); private final Response coyoteResponse = new Response(); + private final StreamInputBuffer inputBuffer = new StreamInputBuffer(); private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer(); @@ -47,6 +49,7 @@ public class Stream extends AbstractStre this.handler = handler; setParentStream(handler); setWindowSize(handler.getRemoteSettings().getInitialWindowSize()); + coyoteRequest.setInputBuffer(inputBuffer); coyoteResponse.setOutputBuffer(outputBuffer); } @@ -199,6 +202,17 @@ public class Stream extends AbstractStre } + ByteBuffer getInputByteBuffer() { + return inputBuffer.getInBuffer(); + } + + + void setEndOfStream() { + // TODO This is temporary until the state machine for a stream is + // implemented + inputBuffer.endOfStream = true; + } + StreamOutputBuffer getOutputBuffer() { return outputBuffer; } @@ -206,7 +220,7 @@ public class Stream extends AbstractStre class StreamOutputBuffer implements OutputBuffer { - private volatile ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); + private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); private volatile long written = 0; private volatile boolean finished = false; @@ -307,4 +321,72 @@ public class Stream extends AbstractStre return ((written == 0) && finished); } } + + + class StreamInputBuffer implements InputBuffer { + + /* Two buffers are required to avoid various multi-threading issues. + * These issues arise from the fact that the Stream (or the + * Request/Response) used by the application is processed in one thread + * but the connection is processed in another. Therefore it is possible + * that a request body frame could be received before the application + * is ready to read it. If it isn't buffered, processing of the + * connection (and hence all streams) would block until the application + * read the data. Hence the incoming data has to be buffered. + * If only one buffer was used then it could become corrupted if the + * connection thread is trying to add to it at the same time as the + * application is read it. While it should be possible to avoid this + * corruption by careful use of the buffer it would still require the + * same copies as using two buffers and the behaviour would be less + * clear. + */ + // This buffer is used to populate the ByteChunk passed in to the read + // method + private final byte[] outBuffer = new byte[8 * 1024]; + // This buffer is the destination for incoming data. It is normally is + // 'write mode'. + private final ByteBuffer inBuffer = ByteBuffer.allocate(8 * 1024); + + private boolean endOfStream = false; + + @Override + public int doRead(ByteChunk chunk) throws IOException { + + int written = 0; + + // Ensure that only one thread accesses inBuffer at a time + synchronized (inBuffer) { + while (inBuffer.position() == 0 && !endOfStream) { + // Need to block until some data is written + try { + inBuffer.wait(); + } catch (InterruptedException e) { + // TODO: Possible shutdown? + } + } + + if (inBuffer.position() > 0) { + // Data remains in the in buffer. Copy it to the out buffer. + inBuffer.flip(); + written = inBuffer.remaining(); + inBuffer.get(outBuffer, 0, written); + inBuffer.clear(); + } else if (endOfStream) { + return -1; + } else { + // TODO Should never happen + throw new IllegalStateException(); + } + } + + chunk.setBytes(outBuffer, 0, written); + + return written; + } + + + public ByteBuffer getInBuffer() { + return inBuffer; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org