Author: markt
Date: Thu Aug 15 10:32:15 2013
New Revision: 1514228
URL: http://svn.apache.org/r1514228
Log:
The container is responsible for the first call to each of onWritePossible()
and onDataAvailable() once a listener has been set.
Main component is the addition to the SocketWrapper of a list of dispatch types
that need to be made. "Dispatch type" in this case meaning "process the socket
using the specified SocketStatus". This is used to register trigger the first
call to each of onWritePossible() and onDataAvailable() for which the container
is responsible.
Fix some additional issues identified in the test case.
Added:
tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (with
props)
Modified:
tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
tomcat/trunk/java/org/apache/coyote/ActionCode.java
tomcat/trunk/java/org/apache/coyote/Response.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java Thu Aug 15
10:32:15 2013
@@ -249,6 +249,18 @@ public class InputBuffer extends Reader
public void setReadListener(ReadListener listener) {
coyoteRequest.setReadListener(listener);
+
+ // The container is responsible for the first call to
+ // listener.onDataAvailable(). If isReady() returns true, the container
+ // needs to call listener.onDataAvailable() from a new thread. If
+ // isReady() returns false, the socket will be registered for read and
+ // the container will call listener.onDataAvailable() once data
arrives.
+ // Must call isFinished() first as a call to isReady() if the request
+ // has been finished will register the socket for read interest and
that
+ // is not required.
+ if (isFinished() || isReady()) {
+ coyoteRequest.action(ActionCode.DISPATCH_READ, null);
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Aug 15
10:32:15 2013
@@ -38,6 +38,7 @@ import org.apache.tomcat.util.collection
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
+import org.apache.tomcat.util.net.DispatchType;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
@@ -616,7 +617,11 @@ public abstract class AbstractProtocol i
SocketState state = SocketState.CLOSED;
do {
- if (status == SocketStatus.DISCONNECT &&
+ if (wrapper.hasNextDispatch()) {
+ DispatchType nextDispatch = wrapper.getNextDispatch();
+ state = processor.asyncDispatch(
+ nextDispatch.getSocketStatus());
+ } else if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
@@ -663,7 +668,8 @@ public abstract class AbstractProtocol i
"], State out: [" + state + "]");
}
} while (state == SocketState.ASYNC_END ||
- state == SocketState.UPGRADING);
+ state == SocketState.UPGRADING ||
+ wrapper.hasNextDispatch());
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu Aug 15 10:32:15 2013
@@ -215,5 +215,17 @@ public enum ActionCode {
/**
* Indicates if the request body has been fully read.
*/
- REQUEST_BODY_FULLY_READ
+ REQUEST_BODY_FULLY_READ,
+
+ /**
+ * Indicates that the container needs to trigger a call to
onDataAvailable()
+ * for the registered non-blocking read listener.
+ */
+ DISPATCH_READ,
+
+ /**
+ * Indicates that the container needs to trigger a call to
onWritePossible()
+ * for the registered non-blocking write listener.
+ */
+ DISPATCH_WRITE
}
Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Thu Aug 15 10:32:15 2013
@@ -593,6 +593,21 @@ public final class Response {
}
this.listener = listener;
+
+ // The container is responsible for the first call to
+ // listener.onWritePossible(). If isReady() returns true, the container
+ // needs to call listener.onWritePossible() from a new thread. If
+ // isReady() returns false, the socket will be registered for write and
+ // the container will call listener.onWritePossible() once data can be
+ // written.
+ if (isReady()) {
+ action(ActionCode.DISPATCH_WRITE, null);
+ // Need to set the fireListener flag otherwise when the container
+ // tries to trigger onWritePossible, nothing will happen
+ synchronized (nonBlockingStateLock) {
+ fireListener = true;
+ }
+ }
}
public boolean isReady() {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Thu
Aug 15 10:32:15 2013
@@ -51,6 +51,7 @@ import org.apache.tomcat.util.http.MimeH
import org.apache.tomcat.util.log.UserDataHelper;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
@@ -828,6 +829,10 @@ public abstract class AbstractHttp11Proc
} else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
AtomicBoolean result = (AtomicBoolean) param;
result.set(getInputBuffer().isFinished());
+ } else if (actionCode == ActionCode.DISPATCH_READ) {
+ socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+ } else if (actionCode == ActionCode.DISPATCH_WRITE) {
+ socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
} else {
actionInternal(actionCode, param);
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java Thu Aug
15 10:32:15 2013
@@ -554,7 +554,10 @@ public class InternalInputBuffer extends
@Override
protected int nbRead() throws IOException {
- throw new IllegalStateException("This method is unused for BIO");
+ // If this gets called for BIO need to make caller think there is data
+ // to read as BIO always reads whether there is data or not (and blocks
+ // until there is data to read).
+ return 1;
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Aug 15
10:32:15 2013
@@ -1718,16 +1718,30 @@ public class AprEndpoint extends Abstrac
// application code. By signalling
read/write is possible, a
// read/write will be attempted, fail
and that will trigger
// an exception the application will
see.
- if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN ||
- (wrapper.pollerFlags &
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
- // Must be doing a non-blocking
read
+ // Check the return flags first,
followed by what the socket
+ // was registered for
+ if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) {
+ // Error probably occurred during
a non-blocking read
if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
// Close socket and clear pool
destroySocket(desc[n*2+1]);
}
- } else if ((desc[n*2] &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT ||
- (wrapper.pollerFlags &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
- // Must be doing an non-blocking
write write
+ } else if ((desc[n*2] &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ // Error probably occurred during
a non-blocking write
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ } else if ((wrapper.pollerFlags &
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+ // Can't tell what was happening
when the error occurred but the
+ // socket is registered for
non-blocking read so use that
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ // Close socket and clear pool
+ destroySocket(desc[n*2+1]);
+ }
+ } else if ((wrapper.pollerFlags &
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+ // Can't tell what was happening
when the error occurred but the
+ // socket is registered for
non-blocking write so use that
if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
// Close socket and clear pool
destroySocket(desc[n*2+1]);
Added: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java?rev=1514228&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java Thu Aug 15
10:32:15 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+/**
+ * This enumeration lists the different types of dispatches that request
+ * processing can trigger. In this instance, dispatch means re-process this
+ * request using the given socket status.
+ */
+public enum DispatchType {
+
+ NON_BLOCKING_READ(SocketStatus.OPEN_READ),
+ NON_BLOCKING_WRITE(SocketStatus.OPEN_WRITE);
+
+ private final SocketStatus status;
+
+ private DispatchType(SocketStatus status) {
+ this.status = status;
+ }
+
+ public SocketStatus getSocketStatus() {
+ return status;
+ }
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Aug 15
10:32:15 2013
@@ -16,6 +16,9 @@
*/
package org.apache.tomcat.util.net;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +63,8 @@ public class SocketWrapper<E> {
private final Object writeThreadLock = new Object();
public Object getWriteThreadLock() { return writeThreadLock; }
+ private Set<DispatchType> dispatches = new LinkedHashSet<>();
+
public SocketWrapper(E socket) {
this.socket = socket;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -108,4 +113,19 @@ public class SocketWrapper<E> {
public WriteLock getBlockingStatusWriteLock() {
return blockingStatusWriteLock;
}
+ public void addDispatch(DispatchType dispatchType) {
+ dispatches.add(dispatchType);
+ }
+ public boolean hasNextDispatch() {
+ return dispatches.size() > 0;
+ }
+ public DispatchType getNextDispatch() {
+ DispatchType result = null;
+ Iterator<DispatchType> iter = dispatches.iterator();
+ if (iter.hasNext()) {
+ result = iter.next();
+ iter.remove();
+ }
+ return result;
+ }
}
Modified:
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
(original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
Thu Aug 15 10:32:15 2013
@@ -251,14 +251,6 @@ public class TestNonBlockingAPI extends
public void testNonBlockingWriteError() throws Exception {
Tomcat tomcat = getTomcatInstance();
- // Not applicable to BIO. This test does not start a new thread for the
- // write so with BIO all the writes happen in the service() method just
- // like blocking IO.
- if (tomcat.getConnector().getProtocolHandlerClassName().equals(
- "org.apache.coyote.http11.Http11Protocol")) {
- return;
- }
-
// Must have a real docBase - just use temp
StandardContext ctx = (StandardContext) tomcat.addContext(
"", System.getProperty("java.io.tmpdir"));
@@ -416,10 +408,8 @@ public class TestNonBlockingAPI extends
});
// step 2 - notify on read
ServletInputStream in = req.getInputStream();
- listener = new TestReadListener(actx);
+ listener = new TestReadListener(actx, false);
in.setReadListener(listener);
-
- listener.onDataAvailable();
}
}
@@ -462,13 +452,12 @@ public class TestNonBlockingAPI extends
});
// step 2 - notify on read
ServletInputStream in = req.getInputStream();
- rlistener = new TestReadListener(actx);
+ rlistener = new TestReadListener(actx, true);
in.setReadListener(rlistener);
ServletOutputStream out = resp.getOutputStream();
resp.setBufferSize(200 * 1024);
wlistener = new TestWriteListener(actx);
out.setWriteListener(wlistener);
- wlistener.onWritePossible();
}
@@ -476,9 +465,12 @@ public class TestNonBlockingAPI extends
private class TestReadListener implements ReadListener {
private final AsyncContext ctx;
private final StringBuilder body = new StringBuilder();
+ private final boolean usingNonBlockingWrite;
- public TestReadListener(AsyncContext ctx) {
+ public TestReadListener(AsyncContext ctx,
+ boolean usingNonBlockingWrite) {
this.ctx = ctx;
+ this.usingNonBlockingWrite = usingNonBlockingWrite;
}
@Override
@@ -501,18 +493,22 @@ public class TestNonBlockingAPI extends
@Override
public void onAllDataRead() {
log.info("onAllDataRead");
- String msg;
- if (body.toString().endsWith("FINISHED")) {
- msg = "OK";
- } else {
- msg = "FAILED";
- }
- try {
- ctx.getResponse().getOutputStream().print(msg);
- } catch (IOException ioe) {
- // Ignore
+ // If non-blocking writes are being used, don't write here as it
+ // will inject unexpected data into the write output.
+ if (!usingNonBlockingWrite) {
+ String msg;
+ if (body.toString().endsWith("FINISHED")) {
+ msg = "OK";
+ } else {
+ msg = "FAILED";
+ }
+ try {
+ ctx.getResponse().getOutputStream().print(msg);
+ } catch (IOException ioe) {
+ // Ignore
+ }
+ ctx.complete();
}
- ctx.complete();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]