Author: markt Date: Mon Jan 14 22:24:09 2013 New Revision: 1433175 URL: http://svn.apache.org/viewvc?rev=1433175&view=rev Log: First pass at getting HTTP upgrade working for APR/native. I'm testing this with WebSocket. Autobahn doesn't trigger a crash :) but there are a number of failures I still need to investigate :(
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java Mon Jan 14 22:24:09 2013 @@ -89,7 +89,8 @@ public abstract class AbstractProcessor< // Unexpected state return SocketState.CLOSED; } - if (upgradeServletInputStream.isCloseRequired()) { + if (upgradeServletInputStream.isCloseRequired() || + upgradeServletOutputStream.isCloseRequired()) { return SocketState.CLOSED; } return SocketState.UPGRADED; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java Mon Jan 14 22:24:09 2013 @@ -28,7 +28,7 @@ public abstract class AbstractServletInp protected static final StringManager sm = StringManager.getManager(Constants.Package); - private boolean closeRequired = false; + private volatile boolean closeRequired = false; // Start in blocking-mode private volatile Boolean ready = Boolean.TRUE; private volatile ReadListener listener = null; @@ -124,6 +124,7 @@ public abstract class AbstractServletInp @Override public void close() throws IOException { + closeRequired = true; doClose(); } @@ -176,6 +177,11 @@ public abstract class AbstractServletInp protected abstract boolean doIsReady() throws IOException; + /** + * Abstract method to be overridden by concrete implementations. The base + * class will ensure that there are no concurrent calls to this method for + * the same socket. + */ protected abstract int doRead(boolean block, byte[] b, int off, int len) throws IOException; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Mon Jan 14 22:24:09 2013 @@ -31,6 +31,7 @@ public abstract class AbstractServletOut private final Object fireListenerLock = new Object(); private final Object writeLock = new Object(); + private volatile boolean closeRequired = false; // Start in blocking-mode private volatile WriteListener listener = null; private volatile boolean fireListener = false; @@ -61,6 +62,10 @@ public abstract class AbstractServletOut this.listener = listener; } + protected final boolean isCloseRequired() { + return closeRequired; + } + @Override public void write(int b) throws IOException { preWriteChecks(); @@ -79,6 +84,7 @@ public abstract class AbstractServletOut @Override public void close() throws IOException { + closeRequired = true; doClose(); } @@ -132,6 +138,11 @@ public abstract class AbstractServletOut } } + /** + * Abstract method to be overridden by concrete implementations. The base + * class will ensure that there are no concurrent calls to this method for + * the same socket. + */ protected abstract int doWrite(boolean block, byte[] b, int off, int len) throws IOException; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java Mon Jan 14 22:24:09 2013 @@ -17,69 +17,93 @@ package org.apache.coyote.http11.upgrade; import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tomcat.jni.Socket; +import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.net.SocketWrapper; public class AprServletInputStream extends AbstractServletInputStream { + private final SocketWrapper<Long> wrapper; private final long socket; + private final Lock blockingStatusReadLock; + private final WriteLock blockingStatusWriteLock; + private volatile boolean eagain = false; public AprServletInputStream(SocketWrapper<Long> wrapper) { + this.wrapper = wrapper; this.socket = wrapper.getSocket().longValue(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.blockingStatusReadLock = lock.readLock(); + this.blockingStatusWriteLock =lock.writeLock(); } -/* - @Override - protected int doRead() throws IOException { - byte[] bytes = new byte[1]; - int result = Socket.recv(socket, bytes, 0, 1); - if (result == -1) { - return -1; - } else { - return bytes[0] & 0xFF; - } - } + @Override - protected int doRead(byte[] b, int off, int len) throws IOException { - boolean block = true; - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1); - } + protected int doRead(boolean block, byte[] b, int off, int len) + throws IOException { + + boolean readDone = false; + int result = 0; try { - int result = Socket.recv(socket, b, off, len); - if (result > 0) { - return result; - } else if (-result == Status.EAGAIN) { - return 0; - } else { - throw new IOException(sm.getString("apr.error", - Integer.valueOf(-result))); + blockingStatusReadLock.lock(); + if (wrapper.getBlockingStatus() == block) { + result = Socket.recv(socket, b, off, len); + readDone = true; } } finally { - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0); + blockingStatusReadLock.unlock(); + } + + if (!readDone) { + try { + blockingStatusWriteLock.lock(); + wrapper.setBlockingStatus(block); + // Set the current settings for this socket + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, (block ? 0 : 1)); + // Downgrade the lock + try { + blockingStatusReadLock.lock(); + blockingStatusWriteLock.unlock(); + result = Socket.recv(socket, b, off, len); + } finally { + blockingStatusReadLock.unlock(); + } + } finally { + // Should have been released above but may not have been on some + // exception paths + if (blockingStatusWriteLock.isHeldByCurrentThread()) { + blockingStatusWriteLock.unlock(); + } } } - } -} -*/ - @Override - protected int doRead(boolean block, byte[] b, int off, int len) - throws IOException { - // TODO Auto-generated method stub - return 0; + if (result > 0) { + eagain = false; + return result; + } else if (-result == Status.EAGAIN) { + eagain = true; + return 0; + } else { + throw new IOException(sm.getString("apr.read.error", + Integer.valueOf(-result))); + } } + @Override protected boolean doIsReady() { - // TODO Auto-generated method stub - return false; + return !eagain; } + @Override protected void doClose() throws IOException { - // TODO Auto-generated method stub + // NO-OP + // Let AbstractProcessor trigger the close } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java Mon Jan 14 22:24:09 2013 @@ -17,24 +17,74 @@ package org.apache.coyote.http11.upgrade; import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tomcat.jni.Socket; import org.apache.tomcat.util.net.SocketWrapper; public class AprServletOutputStream extends AbstractServletOutputStream { + private final SocketWrapper<Long> wrapper; private final long socket; - + private final Lock blockingStatusReadLock; + private final WriteLock blockingStatusWriteLock; public AprServletOutputStream(SocketWrapper<Long> wrapper) { + this.wrapper = wrapper; this.socket = wrapper.getSocket().longValue(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.blockingStatusReadLock = lock.readLock(); + this.blockingStatusWriteLock =lock.writeLock(); } @Override protected int doWrite(boolean block, byte[] b, int off, int len) throws IOException { - // TODO Auto-generated method stub - return 0; + + boolean writeDone = false; + int result = 0; + try { + blockingStatusReadLock.lock(); + if (wrapper.getBlockingStatus() == block) { + result = Socket.send(socket, b, off, len); + writeDone = true; + } + } finally { + blockingStatusReadLock.unlock(); + } + + if (!writeDone) { + try { + blockingStatusWriteLock.lock(); + wrapper.setBlockingStatus(block); + // Set the current settings for this socket + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, (block ? -1 : 0)); + // Downgrade the lock + try { + blockingStatusReadLock.lock(); + blockingStatusWriteLock.unlock(); + result = Socket.send(socket, b, off, len); + } finally { + blockingStatusReadLock.unlock(); + } + } finally { + // Should have been released above but may not have been on some + // exception paths + if (blockingStatusWriteLock.isHeldByCurrentThread()) { + blockingStatusWriteLock.unlock(); + } + } + } + + if (result < 0) { + throw new IOException(sm.getString("apr.write.error", + Integer.valueOf(-result))); + } + + return result; } @Override @@ -45,6 +95,7 @@ public class AprServletOutputStream exte @Override protected void doClose() throws IOException { - // TODO Auto-generated method stub + // NO-OP + // Let AbstractProcessor trigger the close } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties Mon Jan 14 22:24:09 2013 @@ -21,7 +21,8 @@ upgrade.sos.canWrite.ise=It is illegal t upgrade.sos.writeListener.null=It is illegal to pass null to setWriteListener() upgrade.sis.write.ise=It is illegal to call any of the write() methods in non-blocking mode without first checking that there is space available by calling canWrite() -apr.error=Unexpected error [{0}] reading data from the APR/native socket. +apr.read.error=Unexpected error [{0}] reading data from the APR/native socket. +apr.write.error=Unexpected error [{0}] writing data to the APR/native socket. nio.eof.error=Unexpected EOF read on the socket Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1433175&r1=1433174&r2=1433175&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Mon Jan 14 22:24:09 2013 @@ -37,7 +37,11 @@ public class SocketWrapper<E> { private int remotePort = -1; private String remoteHost = null; private String remoteAddr = null; - + /* + * Used if block/non-blocking is set at the socket level. The client is + * responsible for the thread-safe use of this field. + */ + private volatile boolean blockingStatus = true; public SocketWrapper(E socket) { this.socket = socket; @@ -74,4 +78,8 @@ public class SocketWrapper<E> { public void setRemoteHost(String remoteHost) {this.remoteHost = remoteHost; } public String getRemoteAddr() { return remoteAddr; } public void setRemoteAddr(String remoteAddr) {this.remoteAddr = remoteAddr; } + public boolean getBlockingStatus() { return blockingStatus; } + public void setBlockingStatus(boolean blockingStatus) { + this.blockingStatus = blockingStatus; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org