Author: costin Date: Tue Mar 13 05:37:14 2012 New Revision: 1299981 URL: http://svn.apache.org/viewvc?rev=1299981&view=rev Log: Update the spdy implementation to use the non-blocking apr socket. Fix various bugs found while stress testing.
Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java Tue Mar 13 05:37:14 2012 @@ -27,9 +27,11 @@ import org.apache.juli.logging.LogFactor import org.apache.tomcat.jni.Error; import org.apache.tomcat.jni.SSLExt; import org.apache.tomcat.jni.Status; +import org.apache.tomcat.jni.socket.AprSocketContext; import org.apache.tomcat.spdy.CompressDeflater6; import org.apache.tomcat.spdy.SpdyConnection; import org.apache.tomcat.spdy.SpdyContext; +import org.apache.tomcat.spdy.SpdyContextJni; import org.apache.tomcat.spdy.SpdyStream; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; @@ -65,7 +67,7 @@ public class SpdyAprNpnHandler implement private static final Log log = LogFactory.getLog(AprEndpoint.class); - private SpdyContext spdyContext; + private SpdyContextApr spdyContext; boolean ssl = true; @@ -88,7 +90,7 @@ public class SpdyAprNpnHandler implement } - private final class SpdyContextApr extends SpdyContext { + private final class SpdyContextApr extends SpdyContextJni { private final AbstractEndpoint ep; private final Adapter adapter; @@ -106,76 +108,6 @@ public class SpdyAprNpnHandler implement } } - public static class SpdyConnectionApr extends SpdyConnection { - long socket; - - public SpdyConnectionApr(SocketWrapper<Long> socketW, - SpdyContext spdyContext, boolean ssl) { - super(spdyContext); - this.socket = socketW.getSocket().longValue(); - if (ssl) { - setCompressSupport(new CompressDeflater6()); - } - } - - // TODO: write/read should go to SocketWrapper. - @Override - public int write(byte[] data, int off, int len) { - if (socket == 0 || inClosed) { - return -1; - } - int rem = len; - while (rem > 0) { - int sent = org.apache.tomcat.jni.Socket.send(socket, data, off, - rem); - if (sent < 0) { - inClosed = true; - return -1; - } - if (sent == 0) { - return len - rem; - } - rem -= sent; - off += sent; - } - return len; - } - - /** - */ - @Override - public int read(byte[] data, int off, int len) throws IOException { - if (socket == 0 || inClosed) { - return 0; - } - int rd = org.apache.tomcat.jni.Socket.recv(socket, data, off, len); - if (rd == -Status.APR_EOF) { - inClosed = true; - return -1; - } - if (rd == -Status.TIMEUP) { - rd = 0; - } - if (rd == -Status.EAGAIN) { - rd = 0; - } - if (rd < 0) { - // all other errors - inClosed = true; - throw new IOException("Error: " + rd + " " - + Error.strerror((int) -rd)); - } - off += rd; - len -= rd; - return rd; - } - } - - // apr normally creates a new object on each poll. - // For 'upgraded' protocols we need to remember it's handled differently. - Map<Long, SpdyConnectionApr> lightProcessors = - new HashMap<Long, SpdyConnectionApr>(); - @Override public SocketState process(SocketWrapper<Long> socketO, SocketStatus status, Http11AprProtocol proto, AbstractEndpoint endpoint) { @@ -183,61 +115,12 @@ public class SpdyAprNpnHandler implement SocketWrapper<Long> socketW = socketO; long socket = ((Long) socketW.getSocket()).longValue(); - SpdyConnectionApr lh = lightProcessors.get(socket); - // Are we getting an HTTP request ? - if (lh == null && status != SocketStatus.OPEN) { - return null; - } - - log.info("Status: " + status); - - SocketState ss = null; - if (lh != null) { - // STOP, ERROR, DISCONNECT, TIMEOUT -> onClose - if (status == SocketStatus.TIMEOUT) { - // Called from maintain - we're removed from the poll - ((AprEndpoint) endpoint).getCometPoller().add( - socketO.getSocket().longValue(), false); - return SocketState.LONG; - } - if (status == SocketStatus.STOP || status == SocketStatus.DISCONNECT || - status == SocketStatus.ERROR) { - SpdyConnectionApr wrapper = lightProcessors.remove(socket); - if (wrapper != null) { - wrapper.onClose(); - } - return SocketState.CLOSED; - } - int rc = lh.onBlockingSocket(); - ss = (rc == SpdyConnection.LONG) ? SocketState.LONG - : SocketState.CLOSED; - } else { - // OPEN, no existing socket - if (!ssl || SSLExt.checkNPN(socket, SpdyContext.SPDY_NPN)) { - // NPN negotiated or not ssl - lh = new SpdyConnectionApr(socketW, spdyContext, ssl); - - int rc = lh.onBlockingSocket(); - ss = (rc == SpdyConnection.LONG) ? SocketState.LONG - : SocketState.CLOSED; - if (ss == SocketState.LONG) { - lightProcessors.put(socketO.getSocket().longValue(), lh); - } - } else { - return null; - } - } - - // OPEN is used for both 'first time' and 'new connection' - // In theory we shouldn't get another open while this is in - // progress ( only after we add back to the poller ) - - if (ss == SocketState.LONG) { - log.info("Long poll: " + status); - ((AprEndpoint) endpoint).getCometPoller().add( - socketO.getSocket().longValue(), false); + try { + spdyContext.onAccept(socket); + } catch (IOException e) { } - return ss; + // No need to keep tomcat thread busy - but socket will be handled by apr socket context. + return SocketState.LONG; } public void onClose(SocketWrapper<Long> socketWrapper) { Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Mar 13 05:37:14 2012 @@ -93,7 +93,7 @@ public class SpdyProcessor extends Abstr public int doRead(ByteChunk bchunk, Request request) throws IOException { if (inFrame == null) { // blocking - inFrame = spdyStream.getIn(endpoint.getSoTimeout()); + inFrame = spdyStream.getDataFrame(endpoint.getSoTimeout()); } if (inFrame == null) { return -1; @@ -388,13 +388,6 @@ public class SpdyProcessor extends Abstr } - private static byte[] STATUS = "status".getBytes(); - - private static byte[] VERSION = "version".getBytes(); - - private static byte[] HTTP11 = "HTTP/1.1".getBytes(); - - private static byte[] OK200 = "200 OK".getBytes(); /** * When committing the response, we have to validate the set of headers, as @@ -424,8 +417,6 @@ public class SpdyProcessor extends Abstr private void sendResponseHead() throws IOException { SpdyFrame rframe = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY); - // TODO: is closed ? - rframe.streamId = spdyStream.reqFrame.streamId; rframe.associated = 0; MimeHeaders headers = response.getMimeHeaders(); @@ -444,10 +435,8 @@ public class SpdyProcessor extends Abstr bc = mb.getByteChunk(); rframe.headerValue(bc.getBuffer(), bc.getStart(), bc.getLength()); } - rframe.headerName(STATUS, 0, STATUS.length); - if (response.getStatus() == 0) { - rframe.headerValue(OK200, 0, OK200.length); + rframe.addHeader(SpdyFrame.STATUS, SpdyFrame.OK200); } else { // HTTP header contents String message = null; @@ -466,12 +455,13 @@ public class SpdyProcessor extends Abstr // TODO: optimize String status = response.getStatus() + " " + message; byte[] statusB = status.getBytes(); + rframe.headerName(SpdyFrame.STATUS, 0, SpdyFrame.STATUS.length); rframe.headerValue(statusB, 0, statusB.length); } - rframe.headerName(VERSION, 0, VERSION.length); - rframe.headerValue(HTTP11, 0, HTTP11.length); + rframe.addHeader(SpdyFrame.VERSION, SpdyFrame.HTTP11); - spdy.sendFrameBlocking(rframe, spdyStream); + rframe.streamId = spdyStream.reqFrame.streamId; + spdy.send(rframe, spdyStream); // we can't reuse the frame - it'll be queued, the coyote processor // may be reused as well. outCommit = true; Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java (original) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java Tue Mar 13 05:37:14 2012 @@ -17,8 +17,11 @@ package org.apache.tomcat.spdy; import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -124,10 +127,40 @@ public abstract class SpdyConnection { / private Condition outCondition; + public static final int LONG = 1; + + public static final int CLOSE = -1; + + private SpdyFrame nextFrame; + + /** + * Handles the out queue for blocking sockets. + */ + SpdyFrame out; + + boolean draining = false; + + private int goAway = Integer.MAX_VALUE; + public SpdyConnection(SpdyContext spdyContext) { - this.setSpdyContext(spdyContext); + this.spdyContext = spdyContext; outCondition = framerLock.newCondition(); } + + public String toString() { + return "SpdyCon open=" + channels.size(); + } + + public void dump(PrintWriter out) { + out.println("SpdyConnection open=" + channels.size() + + " outQ:" + outQueue.size()); + for (SpdyStream str: channels.values()) { + str.dump(out); + } + + out.println(); + + } /** * Write. @@ -140,6 +173,8 @@ public abstract class SpdyConnection { / */ public abstract int read(byte[] data, int off, int len) throws IOException; + public abstract void close() throws IOException; + public void setCompressSupport(CompressSupport cs) { compressSupport = cs; } @@ -151,7 +186,7 @@ public abstract class SpdyConnection { / return frame; } - public SpdyFrame getDataFrame() throws IOException { + public SpdyFrame getDataFrame() { SpdyFrame frame = getSpdyContext().getFrame(); return frame; } @@ -167,17 +202,24 @@ public abstract class SpdyConnection { / * - for fully non-blocking write: there will be a drain callback. */ - /** - * Handles the out queue for blocking sockets. - */ - SpdyFrame out; - - boolean draining = false; + public void drain() { + synchronized (nbDrain) { + if (draining) { + return; + } + draining = true; + } + _drain(); + synchronized (nbDrain) { + draining = false; + } + } + /** * Non blocking if the socket is not blocking. */ - private boolean drain() { + private boolean _drain() { while (true) { framerLock.lock(); @@ -190,9 +232,18 @@ public abstract class SpdyConnection { / if (out == null) { return false; } + if (goAway < out.streamId) { + + } SpdyFrame oframe = out; try { - if (oframe.type == TYPE_SYN_STREAM) { + if (!oframe.c) { + // late: IDs are assigned as we send ( priorities may affect + // the transmission order ) + if (oframe.stream != null) { + oframe.streamId = oframe.stream.getRequest().streamId; + } + } else if (oframe.type == TYPE_SYN_STREAM) { oframe.fixNV(18); if (compressSupport != null) { compressSupport.compress(oframe, 18); @@ -211,7 +262,9 @@ public abstract class SpdyConnection { / if (oframe.type == TYPE_SYN_STREAM) { oframe.streamId = outStreamId; outStreamId += 2; - channels.put(oframe.streamId, oframe.stream); + synchronized(channels) { + channels.put(oframe.streamId, oframe.stream); + } } oframe.serializeHead(); @@ -231,15 +284,20 @@ public abstract class SpdyConnection { / try { int toWrite = out.endData - out.off; - int wr = write(out.data, out.off, toWrite); - if (wr < 0) { - return false; - } - if (wr < toWrite) { - out.off += wr; - return true; // non blocking connection - } - out.off += wr; + int wr; + while (toWrite > 0) { + wr = write(out.data, out.off, toWrite); + if (wr < 0) { + return false; + } + if (wr == 0) { + return true; // non blocking or to + } + if (wr <= toWrite) { + out.off += wr; + toWrite -= wr; + } + } // Frame was sent framerLock.lock(); try { @@ -247,6 +305,13 @@ public abstract class SpdyConnection { / } finally { framerLock.unlock(); } + + synchronized (channels) { + if (out.stream != null && + out.stream.finRcvd && out.stream.finSent) { + channels.remove(out.streamId); + } + } out = null; } catch (IOException e) { // connection closed - abort all streams @@ -258,31 +323,6 @@ public abstract class SpdyConnection { / } /** - * Blocking call for sendFrame: must be called from a thread pool. - * - * Will wait until the actual frame is sent. - */ - public void sendFrameBlocking(SpdyFrame oframe, SpdyStream proc) - throws IOException { - queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue); - - nonBlockingDrain(); - - while (!inClosed) { - framerLock.lock(); - try { - if (oframe.off == oframe.endData) { - return; // was sent - } - outCondition.await(); - } catch (InterruptedException e) { - } finally { - framerLock.unlock(); - } - } - } - - /** * Send as much as possible without blocking. * * With a nb transport it should call drain directly. @@ -296,23 +336,11 @@ public abstract class SpdyConnection { / Runnable nbDrain = new Runnable() { public void run() { - int i = drainCnt++; - long t0 = System.currentTimeMillis(); - synchronized (nbDrain) { - if (draining) { - return; - } - draining = true; - } - drain(); - synchronized (nbDrain) { - draining = false; - } } }; - public void sendFrameNonBlocking(SpdyFrame oframe, SpdyStream proc) + public void send(SpdyFrame oframe, SpdyStream proc) throws IOException { queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue); nonBlockingDrain(); @@ -326,7 +354,7 @@ public abstract class SpdyConnection { / // We can't assing a stream ID until it is sent - priorities // we can't compress either - it's stateful. oframe.stream = proc; - + framerLock.lock(); try { outQueue.add(oframe); @@ -370,12 +398,6 @@ public abstract class SpdyConnection { / } } - public static final int LONG = 1; - - public static final int CLOSE = -1; - - private SpdyFrame nextFrame; - /** * Non-blocking method, read as much as possible and return. */ @@ -389,61 +411,66 @@ public abstract class SpdyConnection { / inFrame.data = new byte[16 * 1024]; } // we might already have data from previous frame - if (inFrame.endData < 8 || // we don't have the header - inFrame.endData < inFrame.endFrame) { // size != 0 - we - // parsed the header - - int rd = read(inFrame.data, inFrame.endData, - inFrame.data.length - inFrame.endData); - if (rd < 0) { - abort("Closed"); + if (inFrame.endReadData < 8 || // we don't have the header + inFrame.endReadData < inFrame.endData) { + + int rd = read(inFrame.data, inFrame.endReadData, + inFrame.data.length - inFrame.endReadData); + if (rd == -1) { + if (channels.size() == 0) { + return CLOSE; + } else { + abort("Closed"); + } + } else if (rd < 0) { + abort("Closed - read error"); return CLOSE; - } - if (rd == 0) { + } else if (rd == 0) { return LONG; // Non-blocking channel - will resume reading at off } - inFrame.endData += rd; + inFrame.endReadData += rd; } - if (inFrame.endData < 8) { + if (inFrame.endReadData < 8) { continue; // keep reading } - // We got the frame head - if (inFrame.endFrame == 0) { + if (inFrame.endData == 0) { inFrame.parse(); if (inFrame.version != 2) { abort("Wrong version"); return CLOSE; } - // MAx_FRAME_SIZE - if (inFrame.endFrame < 0 || inFrame.endFrame > 32000) { - abort("Framing error, size = " + inFrame.endFrame); + // MAX_FRAME_SIZE + if (inFrame.endData < 0 || inFrame.endData > 32000) { + abort("Framing error, size = " + inFrame.endData); return CLOSE; } - // grow the buffer if needed. no need to copy the head, parsed - // ( maybe for debugging ). - if (inFrame.data.length < inFrame.endFrame) { - inFrame.data = new byte[inFrame.endFrame]; + // TODO: if data, split it in 2 frames + // grow the buffer if needed. + if (inFrame.data.length < inFrame.endData) { + byte[] tmp = new byte[inFrame.endData]; + System.arraycopy(inFrame.data, 0, tmp, 0, inFrame.endReadData); + inFrame.data = tmp; } } - if (inFrame.endData < inFrame.endFrame) { + if (inFrame.endReadData < inFrame.endData) { continue; // keep reading to fill current frame } // else: we have at least the current frame - int extra = inFrame.endData - inFrame.endFrame; + int extra = inFrame.endReadData - inFrame.endData; if (extra > 0) { // and a bit more - to keep things simple for now we // copy them to next frame, at least we saved reads. // it is possible to avoid copy - but later. nextFrame = getSpdyContext().getFrame(); nextFrame.makeSpace(extra); - System.arraycopy(inFrame.data, inFrame.endFrame, + System.arraycopy(inFrame.data, inFrame.endData, nextFrame.data, 0, extra); - nextFrame.endData = extra; - inFrame.endData = inFrame.endFrame; + nextFrame.endReadData = extra; + inFrame.endReadData = inFrame.endData; } // decompress @@ -503,17 +530,38 @@ public abstract class SpdyConnection { / public void abort(String msg) { System.err.println(msg); inClosed = true; - // TODO: close all streams + List<Integer> ch = new ArrayList<Integer>(channels.keySet()); + for (Integer i: ch) { + SpdyStream stream = channels.remove(i); + if (stream != null) { + stream.onReset(); + } + } + } + + public void abort(String msg, int last) { + System.err.println(msg); + inClosed = true; + + List<Integer> ch = new ArrayList<Integer>(channels.keySet()); + for (Integer i: ch) { + if (i > last) { + SpdyStream stream = channels.remove(i); + if (stream != null) { + stream.onReset(); + } + } + } } /** * Process a SPDY connection. Called in a separate thread. - * + * * @return * @throws IOException */ - public int handleFrame() throws IOException { + protected int handleFrame() throws IOException { if (inFrame.c) { switch (inFrame.type) { case TYPE_SETTINGS: { @@ -529,7 +577,11 @@ public abstract class SpdyConnection { / case TYPE_GOAWAY: { int lastStream = inFrame.readInt(); log.info("GOAWAY last=" + lastStream); - abort("GOAWAY"); + + // Server will shut down - but will keep processing the current requests, + // up to lastStream. If we sent any new ones - they need to be canceled. + abort("GO_AWAY", lastStream); + goAway = lastStream; return CLOSE; } case TYPE_RST_STREAM: { @@ -540,12 +592,19 @@ public abstract class SpdyConnection { / + " " + ((errCode < RST_ERRORS.length) ? RST_ERRORS[errCode] : errCode)); - SpdyStream sch = channels.get(inFrame.streamId); + SpdyStream sch; + synchronized(channels) { + sch = channels.get(inFrame.streamId); + } if (sch == null) { abort("Missing channel " + inFrame.streamId); return CLOSE; } sch.onCtlFrame(inFrame); + + synchronized(channels) { + channels.remove(inFrame.streamId); + } inFrame = null; break; } @@ -569,7 +628,10 @@ public abstract class SpdyConnection { / break; } case TYPE_SYN_REPLY: { - SpdyStream sch = channels.get(inFrame.streamId); + SpdyStream sch; + synchronized(channels) { + sch = channels.get(inFrame.streamId); + } if (sch == null) { abort("Missing channel"); return CLOSE; @@ -593,18 +655,26 @@ public abstract class SpdyConnection { / oframe.append32(inFrame.read32()); oframe.pri = 0x80; - sendFrameNonBlocking(oframe, null); + send(oframe, null); break; } } } else { // Data frame - SpdyStream sch = channels.get(inFrame.streamId); + SpdyStream sch; + synchronized (channels) { + sch = channels.get(inFrame.streamId); + } if (sch == null) { abort("Missing channel"); return CLOSE; } sch.onDataFrame(inFrame); + synchronized (channels) { + if (sch.finRcvd && sch.finSent) { + channels.remove(inFrame.streamId); + } + } inFrame = null; } return LONG; @@ -614,14 +684,10 @@ public abstract class SpdyConnection { / return spdyContext; } - public void setSpdyContext(SpdyContext spdyContext) { - this.spdyContext = spdyContext; - } - public SpdyStream get(String host, String url) throws IOException { SpdyStream sch = new SpdyStream(this); - sch.addHeader("host", host); - sch.addHeader("url", url); + sch.getRequest().addHeader("host", host); + sch.getRequest().addHeader("url", url); sch.send(); Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java (original) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java Tue Mar 13 05:37:14 2012 @@ -47,9 +47,12 @@ public class SpdyContext { private Executor executor; - int defaultFrameSize = 8196; + int defaultFrameSize = 8192; - public static boolean debug = true; + public static boolean debug = false; + + boolean tls = true; + boolean compression = true; /** * Get a frame - frames are heavy buffers, may be reused. @@ -62,6 +65,19 @@ public class SpdyContext { } /** + * Set the max frame size. + * + * Larger data packets will be split in multiple frames. + * + * ( the code is currently accepting larger control frames - it's not + * clear if we should just reject them, many servers limit header size - + * the http connector also has a 8k limit - getMaxHttpHeaderSize ) + */ + public void setFrameSize(int frameSize) { + defaultFrameSize = frameSize; + } + + /** * Override for server side to return a custom stream. */ public SpdyStream getStream(SpdyConnection framer) { @@ -104,4 +120,14 @@ public class SpdyContext { public void releaseConnection(SpdyConnection con) { } + + public void listen(final int port, String cert, String key) throws IOException { + throw new IOException("Not implemented"); + } + + /** + * Close all pending connections and free resources. + */ + public void stop() throws IOException { + } } Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java?rev=1299981&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java (added) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java Tue Mar 13 05:37:14 2012 @@ -0,0 +1,182 @@ +/* + */ +package org.apache.tomcat.spdy; + +import java.io.IOException; + +import org.apache.tomcat.jni.Status; +import org.apache.tomcat.jni.socket.AprSocket; +import org.apache.tomcat.jni.socket.AprSocketContext; +import org.apache.tomcat.jni.socket.AprSocketContext.NonBlockingPollHandler; +import org.apache.tomcat.jni.socket.AprSocketContext.TlsCertVerifier; + +public class SpdyContextJni extends SpdyContext { + AprSocketContext con; + + //AprSocketContext socketCtx; + + public SpdyContextJni() { + con = new AprSocketContext(); + //if (insecureCerts) { + con.customVerification(new TlsCertVerifier() { + @Override + public void handshakeDone(AprSocket ch) { + } + }); + //} + con.setNpn("spdy/2"); + } + + @Override + public SpdyConnection getConnection(String host, int port) throws IOException { + SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(this); + + AprSocket ch = con.socket(host, port, tls); + + spdy.setSocket(ch); + + ch.connect(); + + ch.setHandler(new SpdySocketHandler(spdy)); + + // need to consume the input to receive more read events + int rc = spdy.processInput(); + if (rc == SpdyConnection.CLOSE) { + ch.close(); + throw new IOException("Error connecting"); + } + + return spdy; + } + + public void onAccept(long socket) throws IOException { + SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this); + AprSocket s = con.socket(socket); + spdy.setSocket(s); + + SpdySocketHandler handler = new SpdySocketHandler(spdy); + s.setHandler(handler); + handler.process(s, true, true, false); + } + + public void listen(final int port, String cert, String key) throws IOException { + con = new AprSocketContext() { + protected void onSocket(AprSocket s) throws IOException { + SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this); + spdy.setSocket(s); + + SpdySocketHandler handler = new SpdySocketHandler(spdy); + s.setHandler(handler); + } + }; + + con.setNpn(SpdyContext.SPDY_NPN_OUT); + con.setKeys(cert, key); + + con.listen(port); + } + + public void stop() throws IOException { + con.stop(); + } + + public AprSocketContext getAprContext() { + return con; + } + + // NB + class SpdySocketHandler implements NonBlockingPollHandler { + SpdyConnection con; + + SpdySocketHandler(SpdyConnection con) { + this.con = con; + } + + @Override + public void closed(AprSocket ch) { + // not used ( polling not implemented yet ) + } + + @Override + public void process(AprSocket ch, boolean in, boolean out, boolean close) { + try { + int rc = con.processInput(); + if (rc == SpdyConnection.CLOSE) { + ch.close(); + } + con.drain(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + ch.reset(); + } + } + + @Override + public void connected(AprSocket ch) { + } + + @Override + public void error(AprSocket ch, Throwable t) { + } + + } + + public static class SpdyConnectionAprSocket extends SpdyConnection { + AprSocket socket; + + public SpdyConnectionAprSocket(SpdyContext spdyContext) { + super(spdyContext); + //setCompressSupport(new CompressJzlib()); + if (spdyContext.compression) { + setCompressSupport(new CompressDeflater6()); + } + } + + public void setSocket(AprSocket ch) { + this.socket = ch; + } + + @Override + public void close() throws IOException { + socket.close(); + } + + @Override + public int write(byte[] data, int off, int len) throws IOException { + if (socket == null) { + return -1; + } + int sent = socket.write(data, off, len); + if (sent < 0) { + return -1; + } + return sent; + } + + /** + * @throws IOException + */ + @Override + public int read(byte[] data, int off, int len) throws IOException { + if (socket == null) { + return -1; + } + int rd = socket.read(data, off, len); + // org.apache.tomcat.jni.Socket.recv(socket, data, off, len); + if (rd == -Status.APR_EOF) { + return -1; + } + if (rd == -Status.TIMEUP || rd == -Status.EINTR || rd == -Status.EAGAIN) { + rd = 0; + } + if (rd < 0) { + return -1; + } + off += rd; + len -= rd; + return rd; + } + } + +} Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java (original) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java Tue Mar 13 05:37:14 2012 @@ -17,8 +17,10 @@ package org.apache.tomcat.spdy; import java.io.IOException; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.concurrent.Semaphore; /** * Spdy context for 'proxy' or test mode spdy - no NPN, no SSL, no compression. @@ -78,6 +80,11 @@ public class SpdyContextProxy extends Sp } @Override + public void close() throws IOException { + socket.close(); + } + + @Override public synchronized int write(byte[] data, int off, int len) throws IOException { socket.getOutputStream().write(data, off, len); return len; @@ -92,4 +99,52 @@ public class SpdyContextProxy extends Sp } } } + + + boolean running = true; + ServerSocket serverSocket; + + public void stop() throws IOException { + running = false; + serverSocket.close(); + } + + /** + * For small servers/testing: run in server mode. + * Need to override onSynStream() to implement the logic. + */ + public void listen(final int port, String cert, String key) throws IOException { + getExecutor().execute(new Runnable() { + @Override + public void run() { + accept(port); + } + }); + } + + private void accept(int port) { + try { + serverSocket = new ServerSocket(port); + while (running) { + final Socket socket = serverSocket.accept(); + final SpdyConnection con = getConnection(socket); + getExecutor().execute(new Runnable() { + @Override + public void run() { + con.onBlockingSocket(); + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } + } catch (IOException ex) { + if (running) { + ex.printStackTrace(); + } + running = false; + } + } } Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java (original) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java Tue Mar 13 05:37:14 2012 @@ -16,7 +16,18 @@ */ package org.apache.tomcat.spdy; +import java.util.Map; + public class SpdyFrame { + public static byte[] STATUS = "status".getBytes(); + + public static byte[] VERSION = "version".getBytes(); + + public static byte[] HTTP11 = "HTTP/1.1".getBytes(); + + public static byte[] OK200 = "200 OK".getBytes(); + + // This is a bit more complicated, to avoid multiple reads/writes. // We'll read as much as possible - possible past frame end. This may // cost an extra copy - or even more complexity for dealing with slices @@ -25,18 +36,21 @@ public class SpdyFrame { public int off = 8; // used when reading - current offset - public int endFrame; // end of frame == size + 8 + int endReadData; // how much has been read ( may be more or less than a frame ) // On write it is incremented. - public int endData; // end of data in the buffer (may be past frame end) - + /** + * end of data in the buffer. + */ + public int endData; + // Processed data from the frame boolean c; // for control int version; - private int flags; + int flags; public int type; @@ -62,7 +76,7 @@ public class SpdyFrame { public void recyle() { type = 0; c = false; - endFrame = 0; + endReadData = 0; off = 8; streamId = 0; nvCount = 0; @@ -76,10 +90,10 @@ public class SpdyFrame { } return "C" + " S=" + streamId + (flags != 0 ? " F=" + flags : "") + (version != 2 ? " v" + version : "") + " t=" + type - + " L=" + endFrame + "/" + off; + + " L=" + endData + "/" + off; } else { return "D" + " S=" + streamId + (flags != 0 ? " F=" + flags : "") - + " L=" + endFrame + "/" + off; + + " L=" + endData + "/" + off; } } @@ -118,7 +132,7 @@ public class SpdyFrame { } public boolean parse() { - endFrame = 0; + endData = 0; streamId = 0; nvCount = 0; @@ -142,12 +156,12 @@ public class SpdyFrame { flags = data[4] & 0xFF; for (int i = 5; i < 8; i++) { b0 = data[i] & 0xFF; - endFrame = endFrame << 8 | b0; + endData = endData << 8 | b0; } // size will represent the end of the data ( header is held in same // buffer) - endFrame += 8; + endData += 8; return true; } @@ -226,6 +240,37 @@ public class SpdyFrame { nvCount++; headerValue(buf, soff, len); } + + public void addHeader(String name, String value) { + byte[] nameB = name.getBytes(); + headerName(nameB, 0, nameB.length); + nameB = value.getBytes(); + headerValue(nameB, 0, nameB.length); + } + + public void addHeader(byte[] nameB, String value) { + headerName(nameB, 0, nameB.length); + nameB = value.getBytes(); + headerValue(nameB, 0, nameB.length); + } + + public void addHeader(byte[] nameB, byte[] valueB) { + headerName(nameB, 0, nameB.length); + headerValue(valueB, 0, valueB.length); + } + + public void getHeaders(Map<String, String> resHeaders) { + for (int i = 0; i < nvCount; i++) { + int len = read16(); + String n = new String(data, off, len, SpdyStream.UTF8); + advance(len); + len = read16(); + String v = new String(data, off, len, SpdyStream.UTF8); + advance(len); + resHeaders.put(n, v); + } + } + // TODO: instead of that, use byte[][] void makeSpace(int len) { @@ -294,11 +339,14 @@ public class SpdyFrame { } public int remaining() { - return endFrame - off; + return endData - off; } public void advance(int cnt) { off += cnt; } -} \ No newline at end of file + public boolean isData() { + return !c; + } +} Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java?rev=1299981&r1=1299980&r2=1299981&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java (original) +++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java Tue Mar 13 05:37:14 2012 @@ -17,6 +17,7 @@ package org.apache.tomcat.spdy; import java.io.IOException; +import java.io.PrintWriter; import java.nio.charset.Charset; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -43,20 +44,44 @@ public class SpdyStream { public SpdyFrame reqFrame; - SpdyFrame resFrame; + public SpdyFrame resFrame; - BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>(); - - public static final SpdyFrame END_FRAME = new SpdyFrame(16); + /** + * For blocking support. + */ + protected BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>(); - boolean finSent; + protected boolean finSent; protected boolean finRcvd; + + /** + * Dummy data frame to insert on reset / go away + */ + static SpdyFrame END_FRAME; + + static { + END_FRAME = new SpdyFrame(16); + END_FRAME.endData = 0; + END_FRAME.off = 0; + END_FRAME.c = false; + END_FRAME.flags =SpdyConnection.FLAG_HALF_CLOSE; + } public SpdyStream(SpdyConnection spdy) { this.spdy = spdy; } + public void dump(PrintWriter out) { + if (reqFrame != null) { + out.println("Req: " + reqFrame); + } + if (resFrame != null) { + out.println("Res: " + resFrame); + } + out.println("In: " + inData.size() + (finRcvd ? " FIN":"")); + } + /** * Non-blocking, called when a data frame is received. * @@ -64,10 +89,11 @@ public class SpdyStream { * buffer ( to avoid a copy ). */ public void onDataFrame(SpdyFrame inFrame) { - inData.add(inFrame); - if (inFrame.closed()) { - finRcvd = true; - inData.add(END_FRAME); + synchronized(this) { + inData.add(inFrame); + if (inFrame.closed()) { + finRcvd = true; + } } } @@ -84,29 +110,61 @@ public class SpdyStream { reqFrame = frame; } else if (frame.type == SpdyConnection.TYPE_SYN_REPLY) { resFrame = frame; + } else if (frame.type == SpdyConnection.TYPE_RST_STREAM) { + onReset(); } - if (frame.isHalfClose()) { - finRcvd = true; + synchronized (this) { + inData.add(frame); + if (frame.isHalfClose()) { + finRcvd = true; + } } } + /** + * Called on GOAWAY or reset. + */ + public void onReset() { + finRcvd = true; + finSent = true; + + // To unblock + inData.add(END_FRAME); + } + /** * True if the channel both received and sent FIN frames. - * + * * This is tracked by the processor, to avoid extra storage in framer. */ public boolean isFinished() { return finSent && finRcvd; } - - public SpdyFrame getIn(long to) throws IOException { + + /** + * Waits and return the next data frame. + */ + public SpdyFrame getDataFrame(long to) throws IOException { + while (true) { + SpdyFrame res = getFrame(to); + if (res == null || res.isData()) { + return res; + } + } + } + + /** + * Waits and return the next frame. First frame will be the control frame + */ + public SpdyFrame getFrame(long to) throws IOException { SpdyFrame in; try { - if (inData.size() == 0 && finRcvd) { - return null; + synchronized (this) { + if (inData.size() == 0 && finRcvd) { + return null; + } } in = inData.poll(to, TimeUnit.MILLISECONDS); - if (in == END_FRAME) { return null; } @@ -137,14 +195,14 @@ public class SpdyStream { return reqFrame; } - public void addHeader(String name, String value) { - byte[] nameB = name.getBytes(); - getRequest().headerName(nameB, 0, nameB.length); - nameB = value.getBytes(); - reqFrame.headerValue(nameB, 0, nameB.length); + public SpdyFrame getResponse() { + if (resFrame == null) { + resFrame = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY); + resFrame.streamId = reqFrame.streamId; + } + return resFrame; } - public synchronized void sendDataFrame(byte[] data, int start, int length, boolean close) throws IOException { @@ -159,12 +217,11 @@ public class SpdyStream { // 1 tcp packet. That's the current choice, seems closer to rest of // tomcat - oframe.streamId = reqFrame.streamId; if (close) oframe.halfClose(); oframe.append(data, start, length); - spdy.sendFrameBlocking(oframe, this); + spdy.send(oframe, this); } public void send() throws IOException { @@ -172,8 +229,8 @@ public class SpdyStream { } public void send(String host, String url, String scheme, String method) throws IOException { - addHeader("host", host); - addHeader("url", url); + getRequest().addHeader("host", host); + getRequest().addHeader("url", url); send(scheme, method); } @@ -184,13 +241,14 @@ public class SpdyStream { // TODO: add the others reqFrame.halfClose(); } - addHeader("scheme", "http"); // todo - addHeader("method", method); - addHeader("version", "HTTP/1.1"); + getRequest().addHeader("scheme", "http"); // todo + getRequest().addHeader("method", method); + getRequest().addHeader("version", "HTTP/1.1"); if (reqFrame.isHalfClose()) { finSent = true; } - spdy.sendFrameBlocking(reqFrame, this); + spdy.send(reqFrame, this); } -} \ No newline at end of file + +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org