Author: markt Date: Thu May 21 13:11:32 2015 New Revision: 1680867 URL: http://svn.apache.org/r1680867 Log: Get basic response body writing working. Lots of TODOs remain (flow control is currently ignored, stream state is not stracked etc.) but it is in a state where at least some of the examples app can be used.
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/Stream.java tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680867&r1=1680866&r2=1680867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 21 13:11:32 2015 @@ -66,6 +66,7 @@ public class Http2UpgradeHandler extends private static final int FLAG_END_OF_STREAM = 1; private static final int FLAG_END_OF_HEADERS = 4; + private static final int FRAME_TYPE_DATA = 0; private static final int FRAME_TYPE_HEADERS = 1; private static final int FRAME_TYPE_PRIORITY = 2; private static final int FRAME_TYPE_SETTINGS = 4; @@ -701,12 +702,14 @@ public class Http2UpgradeHandler extends ByteUtil.setThreeBytes(header, 0, target.limit()); if (first) { header[3] = FRAME_TYPE_HEADERS; + if (stream.getOutputBuffer().hasNoBody()) { + header[4] = FLAG_END_OF_STREAM; + } } else { header[3] = FRAME_TYPE_CONTINUATION; } if (state == State.COMPLETE) { - // TODO Determine end of stream correctly - header[4] = FLAG_END_OF_HEADERS + FLAG_END_OF_STREAM; + header[4] += FLAG_END_OF_HEADERS; } if (log.isDebugEnabled()) { log.debug(target.limit() + " bytes"); @@ -719,6 +722,29 @@ public class Http2UpgradeHandler extends } } + + void writeBody(Stream stream, ByteBuffer data) throws IOException { + data.flip(); + if (log.isDebugEnabled()) { + log.debug(sm.getString("upgradeHandler.writeBody", Integer.toString(connectionId), + stream.getIdentifier(), Integer.toString(data.remaining()))); + } + synchronized (socketWrapper) { + // TODO Manage window sizes + byte[] header = new byte[9]; + ByteUtil.setThreeBytes(header, 0, data.remaining()); + header[3] = FRAME_TYPE_DATA; + if (stream.getOutputBuffer().isFinished()) { + header[4] = FLAG_END_OF_STREAM; + } + ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); + socketWrapper.write(true, header, 0, header.length); + socketWrapper.write(true, data.array(), data.arrayOffset(), data.limit()); + socketWrapper.flush(true); + } + } + + private void processWrites() throws IOException { if (socketWrapper.flush(false)) { socketWrapper.registerWriteInterest(); Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680867&r1=1680866&r2=1680867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Thu May 21 13:11:32 2015 @@ -17,6 +17,7 @@ package org.apache.coyote.http2; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Request; @@ -35,6 +36,7 @@ public class Stream extends AbstractStre private final Http2UpgradeHandler handler; private final Request coyoteRequest = new Request(); private final Response coyoteResponse = new Response(); + private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer(); private volatile long flowControlWindowSize; @@ -44,7 +46,7 @@ public class Stream extends AbstractStre this.handler = handler; setParentStream(handler); flowControlWindowSize = handler.getRemoteSettings().getInitialWindowSize(); - coyoteResponse.setOutputBuffer(new StreamOutputBuffer()); + coyoteResponse.setOutputBuffer(outputBuffer); } @@ -110,8 +112,12 @@ public class Stream extends AbstractStre log.debug(sm.getString("stream.write", Long.toString(getConnectionId()), getIdentifier())); } - // TODO - handler.addWrite("DATA"); + try { + outputBuffer.flush(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } @@ -127,31 +133,76 @@ public class Stream extends AbstractStre } - public Request getCoyoteRequest() { + Request getCoyoteRequest() { return coyoteRequest; } - public Response getCoyoteResponse() { + Response getCoyoteResponse() { return coyoteResponse; } - private class StreamOutputBuffer implements OutputBuffer { + StreamOutputBuffer getOutputBuffer() { + return outputBuffer; + } + + + class StreamOutputBuffer implements OutputBuffer { + private volatile ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); private volatile long written = 0; + private volatile boolean finished = false; @Override public int doWrite(ByteChunk chunk) throws IOException { - // TODO Blocking. Write to buffer. flushData() if full. - log.debug("Write [" + chunk.getLength() + "] bytes"); - written += chunk.getLength(); - return chunk.getLength(); + if (finished) { + // TODO i18n + throw new IllegalStateException(); + } + int len = chunk.getLength(); + int offset = 0; + while (len > 0) { + int thisTime = Math.min(buffer.remaining(), len); + buffer.put(chunk.getBytes(), chunk.getOffset() + offset, thisTime); + offset += thisTime; + len -= thisTime; + if (!buffer.hasRemaining()) { + flush(); + } + } + written += offset; + return offset; + } + + public void flush() throws IOException { + if (buffer.position() == 0) { + // Buffer is empty. Nothing to do. + return; + } + handler.writeBody(Stream.this, buffer); + buffer.clear(); } @Override public long getBytesWritten() { return written; } + + public void finished() { + finished = true; + } + + public boolean isFinished() { + return finished; + } + + /** + * @return <code>true</code> if it is certain that the associated + * response has no body. + */ + public boolean hasNoBody() { + return ((written == 0) && finished); + } } } Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1680867&r1=1680866&r2=1680867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Thu May 21 13:11:32 2015 @@ -56,7 +56,7 @@ public class StreamProcessor extends Abs try { adapter.service(request, response); // Ensure the response is complete - response.action(ActionCode.CLIENT_FLUSH, null); + response.action(ActionCode.CLOSE, null); } catch (Exception e) { // TODO e.printStackTrace(); @@ -67,12 +67,18 @@ public class StreamProcessor extends Abs @Override public void action(ActionCode actionCode, Object param) { switch (actionCode) { - case REQ_HOST_ADDR_ATTRIBUTE: { - request.remoteAddr().setString(socketWrapper.getRemoteAddr()); + case COMMIT: { + if (!response.isCommitted()) { + response.setCommitted(true); + stream.writeHeaders(); + } break; } - case IS_ERROR: { - ((AtomicBoolean) param).set(getErrorState().isError()); + case CLOSE: { + // Tell the output buffer there will be no more data + stream.getOutputBuffer().finished(); + // Then flush it + action(ActionCode.CLIENT_FLUSH, null); break; } case CLIENT_FLUSH: { @@ -80,11 +86,12 @@ public class StreamProcessor extends Abs stream.flushData(); break; } - case COMMIT: { - if (!response.isCommitted()) { - response.setCommitted(true); - stream.writeHeaders(); - } + case REQ_HOST_ADDR_ATTRIBUTE: { + request.remoteAddr().setString(socketWrapper.getRemoteAddr()); + break; + } + case IS_ERROR: { + ((AtomicBoolean) param).set(getErrorState().isError()); break; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org