Author: markt
Date: Fri Sep 25 20:29:30 2015
New Revision: 1705349
URL: http://svn.apache.org/viewvc?rev=1705349&view=rev
Log:
More work on servlet 3.1 non-blocking for HTTP/2. NumberWriter works.
Modified:
tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
tomcat/trunk/java/org/apache/coyote/ActionCode.java
tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
tomcat/trunk/java/org/apache/coyote/http2/Stream.java
tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Fri Sep 25
20:29:30 2015
@@ -78,7 +78,8 @@ public abstract class AbstractProcessor
}
- private AbstractProcessor(AbstractEndpoint<?> endpoint, Request
coyoteRequest, Response coyoteResponse) {
+ private AbstractProcessor(AbstractEndpoint<?> endpoint, Request
coyoteRequest,
+ Response coyoteResponse) {
this.endpoint = endpoint;
asyncStateMachine = new AsyncStateMachine(this);
request = coyoteRequest;
Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Sep 25 20:29:30 2015
@@ -195,13 +195,15 @@ public enum ActionCode {
/**
* Indicator that Servlet is interested in being
- * notified when data is available to be read
+ * notified when data is available to be read.
*/
NB_READ_INTEREST,
/**
- *Indicator that the Servlet is interested
- *in being notified when it can write data
+ * Used with non-blocking writes to determine if a write is currently
+ * allowed (sets passed parameter to <code>true</code>) or not (sets passed
+ * parameter to <code>false</code>). If a write is not allowed then
callback
+ * will be triggered at some future point when write becomes possible
again.
*/
NB_WRITE_INTEREST,
Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Fri Sep 25
20:29:30 2015
@@ -147,4 +147,6 @@ abstract class AbstractStream {
protected abstract String getConnectionId();
protected abstract int getWeight();
+
+ protected abstract void doNotifyAll();
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Sep
25 20:29:30 2015
@@ -638,7 +638,7 @@ public class Http2UpgradeHandler extends
if (allocation > 0) {
backLogSize -= allocation;
synchronized (entry.getKey()) {
- entry.getKey().notifyAll();
+ entry.getKey().doNotifyAll();
}
}
}
@@ -646,6 +646,13 @@ public class Http2UpgradeHandler extends
}
+
+ @Override
+ protected synchronized void doNotifyAll() {
+ this.notifyAll();
+ }
+
+
private int allocate(AbstractStream stream, int allocation) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.debug",
getConnectionId(),
Modified: tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties Fri Sep
25 20:29:30 2015
@@ -72,7 +72,10 @@ stream.write=Connection [{0}], Stream [{
stream.outputBuffer.flush.debug=Connection [{0}], Stream [{1}], flushing
output with buffer at position [{2}], writeInProgress [{3}] and closed [{4}]
+streamProcessor.dispatch=Connection [{0}], Stream [{1}], status [{2}]
streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within
HTTP/2 streams
+streamProcessor.process.loopend=Connection [{0}], Stream [{1}], loop end,
state [{2}], dispatches [{3}]
+streamProcessor.process.loopstart=Connection [{0}], Stream [{1}], loop start,
status [{2}], dispatches [{3}]
streamProcessor.ssl.error=Unable to retrieve SSL request attributes
streamStateMachine.debug.change=Connection [{0}], Stream [{1}], State changed
from [{2}] to [{3}]
Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Sep 25 20:29:30
2015
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import org.apache.coyote.ActionCode;
import org.apache.coyote.InputBuffer;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Request;
@@ -134,11 +135,15 @@ public class Stream extends AbstractStre
}
- private synchronized int reserveWindowSize(int reservation) throws
IOException {
+ private synchronized int reserveWindowSize(int reservation, boolean block)
throws IOException {
long windowSize = getWindowSize();
while (windowSize < 1) {
try {
- wait();
+ if (block) {
+ wait();
+ } else {
+ return 0;
+ }
} catch (InterruptedException e) {
// Possible shutdown / rst or similar. Use an IOException to
// signal to the client that further I/O isn't possible for
this
@@ -159,6 +164,20 @@ public class Stream extends AbstractStre
@Override
+ protected synchronized void doNotifyAll() {
+ if (coyoteResponse.getWriteListener() == null) {
+ // Blocking IO so thread will be waiting. Release it.
+ // Use notifyAll() to be safe (should be unnecessary)
+ this.notifyAll();
+ } else {
+ if (outputBuffer.isRegisteredForWrite()) {
+ coyoteResponse.action(ActionCode.DISPATCH_WRITE, null);
+ }
+ }
+ }
+
+
+ @Override
public void emitHeader(String name, String value, boolean neverIndex) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.header.debug", getConnectionId(),
getIdentifier(),
@@ -226,7 +245,7 @@ public class Stream extends AbstractStre
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.write", getConnectionId(),
getIdentifier()));
}
- outputBuffer.flush();
+ outputBuffer.flush(true);
}
@@ -308,6 +327,7 @@ public class Stream extends AbstractStre
private volatile long written = 0;
private volatile boolean closed = false;
private volatile boolean endOfStreamSent = false;
+ private volatile boolean writeInterest = false;
/* The write methods are synchronized to ensure that only one thread at
* a time is able to access the buffer. Without this protection, a
@@ -330,22 +350,25 @@ public class Stream extends AbstractStre
if (len > 0 && !buffer.hasRemaining()) {
// Only flush if we have more data to write and the buffer
// is full
- flush(true);
+ if (flush(true, coyoteResponse.getWriteListener() ==
null)) {
+ break;
+ }
}
}
written += offset;
return offset;
}
- public synchronized void flush() throws IOException {
- flush(false);
+ public synchronized boolean flush(boolean block) throws IOException {
+ return flush(false, block);
}
- private synchronized void flush(boolean writeInProgress) throws
IOException {
+ private synchronized boolean flush(boolean writeInProgress, boolean
block)
+ throws IOException {
if (log.isDebugEnabled()) {
- log.debug(sm.getString("stream.outputBuffer.flush.debug",
getConnectionId(), getIdentifier(),
- Integer.toString(buffer.position()),
Boolean.toString(writeInProgress),
- Boolean.toString(closed)));
+ log.debug(sm.getString("stream.outputBuffer.flush.debug",
getConnectionId(),
+ getIdentifier(), Integer.toString(buffer.position()),
+ Boolean.toString(writeInProgress),
Boolean.toString(closed)));
}
if (!coyoteResponse.isCommitted()) {
coyoteResponse.sendHeaders();
@@ -357,12 +380,17 @@ public class Stream extends AbstractStre
handler.writeBody(Stream.this, buffer, 0, true);
}
// Buffer is empty. Nothing to do.
- return;
+ return false;
}
buffer.flip();
int left = buffer.remaining();
while (left > 0) {
- int streamReservation = reserveWindowSize(left);
+ int streamReservation = reserveWindowSize(left, block);
+ if (streamReservation == 0) {
+ // Must be non-blocking
+ buffer.compact();
+ return true;
+ }
while (streamReservation > 0) {
int connectionReservation =
handler.reserveWindowSize(Stream.this,
streamReservation);
@@ -375,6 +403,25 @@ public class Stream extends AbstractStre
}
}
buffer.clear();
+ return false;
+ }
+
+ synchronized boolean isReady() {
+ if (getWindowSize() > 0 && handler.getWindowSize() > 0) {
+ return true;
+ } else {
+ writeInterest = true;
+ return false;
+ }
+ }
+
+ synchronized boolean isRegisteredForWrite() {
+ if (writeInterest) {
+ writeInterest = false;
+ return true;
+ } else {
+ return false;
+ }
}
@Override
Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Fri Sep 25
20:29:30 2015
@@ -17,20 +17,28 @@
package org.apache.coyote.http2;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.RequestDispatcher;
import javax.servlet.http.HttpUpgradeHandler;
import org.apache.coyote.AbstractProcessor;
import org.apache.coyote.ActionCode;
import org.apache.coyote.Adapter;
import org.apache.coyote.AsyncContextCallback;
-import org.apache.coyote.AsyncStateMachine;
import org.apache.coyote.ContainerThreadMarker;
+import org.apache.coyote.ErrorState;
+import org.apache.coyote.RequestInfo;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -42,7 +50,7 @@ public class StreamProcessor extends Abs
private static final StringManager sm =
StringManager.getManager(StreamProcessor.class);
private final Stream stream;
- private final AsyncStateMachine asyncStateMachine;
+ private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
private volatile SSLSupport sslSupport;
@@ -50,36 +58,61 @@ public class StreamProcessor extends Abs
public StreamProcessor(Stream stream, Adapter adapter,
SocketWrapperBase<?> socketWrapper) {
super(stream.getCoyoteRequest(), stream.getCoyoteResponse());
this.stream = stream;
- asyncStateMachine = new AsyncStateMachine(this);
setAdapter(adapter);
setSocketWrapper(socketWrapper);
}
@Override
- public void run() {
- // HTTP/2 equivalent of AbstractConnectionHandler#process()
+ public synchronized void run() {
+ process(SocketStatus.OPEN_READ);
+ }
+
+
+ private synchronized void process(SocketStatus status) {
+ // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
+ // socket <-> processor mapping
ContainerThreadMarker.set();
SocketState state = SocketState.CLOSED;
try {
+ Iterator<DispatchType> dispatches =
getIteratorAndClearDispatches();
do {
- if (asyncStateMachine.isAsync()) {
- adapter.asyncDispatch(request, response,
SocketStatus.OPEN_READ);
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("streamProcessor.process.loopstart",
+ stream.getConnectionId(), stream.getIdentifier(),
status, dispatches));
+ }
+ // TODO CLOSE_NOW ?
+ if (dispatches != null) {
+ DispatchType nextDispatch = dispatches.next();
+ state = dispatch(nextDispatch.getSocketStatus());
+ // TODO DISCONNECT ?
+ } else if (isAsync()) {
+ state = dispatch(status);
} else if (state == SocketState.ASYNC_END) {
- adapter.asyncDispatch(request, response,
SocketStatus.OPEN_READ);
- // Only ever one request per stream so always treat as
- // closed at this point.
- state = SocketState.CLOSED;
+ state = dispatch(status);
} else {
- adapter.service(request, response);
+ state = process((SocketWrapperBase<?>) null);
}
- if (asyncStateMachine.isAsync()) {
+ if (state != SocketState.CLOSED && isAsync()) {
state = asyncStateMachine.asyncPostProcess();
- } else {
- response.action(ActionCode.CLOSE, null);
}
- } while (state == SocketState.ASYNC_END);
+
+ if (dispatches == null || !dispatches.hasNext()) {
+ // Only returns non-null iterator if there are
+ // dispatches to process.
+ dispatches = getIteratorAndClearDispatches();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("streamProcessor.process.loopend",
+ stream.getConnectionId(), stream.getIdentifier(),
state, dispatches));
+ }
+ } while (state == SocketState.ASYNC_END ||
+ dispatches != null && state != SocketState.CLOSED);
+
+ if (state == SocketState.CLOSED) {
+ // TODO
+ }
} catch (Exception e) {
// TODO
e.printStackTrace();
@@ -265,7 +298,24 @@ public class StreamProcessor extends Abs
result.set(stream.isInputFinished());
break;
}
-
+ case NB_WRITE_INTEREST: {
+ // TODO: Thread safe? Do this in output buffer?
+ AtomicBoolean result = (AtomicBoolean) param;
+ result.set(stream.getOutputBuffer().isReady());
+ break;
+ }
+ case DISPATCH_READ: {
+ dispatches.add(DispatchType.NON_BLOCKING_READ);
+ break;
+ }
+ case DISPATCH_WRITE: {
+ dispatches.add(DispatchType.NON_BLOCKING_WRITE);
+ break;
+ }
+ case DISPATCH_EXECUTE: {
+ socketWrapper.getEndpoint().getExecutor().execute(this);
+ break;
+ }
// Unsupported / illegal under HTTP/2
case UPGRADE:
@@ -277,12 +327,8 @@ public class StreamProcessor extends Abs
case AVAILABLE:
case CLOSE_NOW:
case DISABLE_SWALLOW_INPUT:
- case DISPATCH_EXECUTE:
- case DISPATCH_READ:
- case DISPATCH_WRITE:
case END_REQUEST:
case NB_READ_INTEREST:
- case NB_WRITE_INTEREST:
case REQ_SET_BODY_REPLAY:
case RESET:
log.info("TODO: Implement [" + actionCode + "] for HTTP/2");
@@ -328,15 +374,126 @@ public class StreamProcessor extends Abs
@Override
public SocketState process(SocketWrapperBase<?> socket) throws IOException
{
- // Should never happen
- throw new
IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported"));
+ try {
+ adapter.service(request, response);
+ } catch (Exception e) {
+ setErrorState(ErrorState.CLOSE_NOW, e);
+ }
+
+ if (getErrorState().isError()) {
+ action(ActionCode.CLOSE, null);
+ request.updateCounters();
+ return SocketState.CLOSED;
+ } else if (isAsync()) {
+ return SocketState.LONG;
+ } else {
+ action(ActionCode.CLOSE, null);
+ request.updateCounters();
+ return SocketState.CLOSED;
+ }
}
@Override
public SocketState dispatch(SocketStatus status) {
- // Should never happen
- throw new
IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported"));
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("streamProcessor.dispatch",
stream.getConnectionId(),
+ stream.getIdentifier(), status));
+ }
+ if (status == SocketStatus.OPEN_WRITE && response.getWriteListener()
!= null) {
+ try {
+ asyncStateMachine.asyncOperation();
+
+ if (stream.getOutputBuffer().flush(false)) {
+ // The buffer wasn't fully flushed so re-register the
+ // stream for write. Note this does not go via the
+ // Response since the write registration state at
+ // that level should remain unchanged. Once the buffer
+ // has been emptied then the code below will call
+ // dispatch() which will enable the
+ // Response to respond to this event.
+ if (stream.getOutputBuffer().isReady()) {
+ // Unexpected
+ throw new IllegalStateException();
+ }
+ return SocketState.LONG;
+ }
+ } catch (IOException | IllegalStateException x) {
+ // IOE - Problem writing to socket
+ // ISE - Request/Response not in correct state for async write
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to write async data.",x);
+ }
+ status = SocketStatus.ASYNC_WRITE_ERROR;
+ request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+ }
+ } else if (status == SocketStatus.OPEN_READ &&
request.getReadListener() != null) {
+ try {
+ asyncStateMachine.asyncOperation();
+ } catch (IllegalStateException x) {
+ // ISE - Request/Response not in correct state for async read
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to read async data.",x);
+ }
+ status = SocketStatus.ASYNC_READ_ERROR;
+ request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+ }
+ }
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ if (!getAdapter().asyncDispatch(request, response, status)) {
+ setErrorState(ErrorState.CLOSE_NOW, null);
+ }
+ } catch (InterruptedIOException e) {
+ setErrorState(ErrorState.CLOSE_NOW, e);
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ setErrorState(ErrorState.CLOSE_NOW, t);
+ log.error(sm.getString("http11processor.request.process"), t);
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (getErrorState().isError()) {
+ request.updateCounters();
+ return SocketState.CLOSED;
+ } else if (isAsync()) {
+ return SocketState.LONG;
+ } else {
+ request.updateCounters();
+ return SocketState.CLOSED;
+ }
+ }
+
+
+ public void addDispatch(DispatchType dispatchType) {
+ synchronized (dispatches) {
+ dispatches.add(dispatchType);
+ }
+ }
+ public Iterator<DispatchType> getIteratorAndClearDispatches() {
+ // Note: Logic in AbstractProtocol depends on this method only
returning
+ // a non-null value if the iterator is non-empty. i.e. it should never
+ // return an empty iterator.
+ Iterator<DispatchType> result;
+ synchronized (dispatches) {
+ // Synchronized as the generation of the iterator and the clearing
+ // of dispatches needs to be an atomic operation.
+ result = dispatches.iterator();
+ if (result.hasNext()) {
+ dispatches.clear();
+ } else {
+ result = null;
+ }
+ }
+ return result;
+ }
+ public void clearDispatches() {
+ synchronized (dispatches) {
+ dispatches.clear();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]