This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new bc714fd Add asynchronous IO API for NIO
bc714fd is described below
commit bc714fd2e3fca74d8931533770b6ee064b67287f
Author: remm <[email protected]>
AuthorDate: Wed Apr 10 21:37:48 2019 +0200
Add asynchronous IO API for NIO
This uses the concepts from the NIO2 implementation. The HTTP/2 and
Websockets async IO code will automatically use it, with the same
behavior as when using NIO2.
As it seems to work just fine so far and with the next build being far
off, I am enabling it by default to get some testing.
---
.../apache/tomcat/util/net/AbstractEndpoint.java | 8 +
java/org/apache/tomcat/util/net/AprEndpoint.java | 4 +
java/org/apache/tomcat/util/net/Nio2Endpoint.java | 3 +-
java/org/apache/tomcat/util/net/NioEndpoint.java | 493 +++++++++++++++++----
webapps/docs/changelog.xml | 6 +
webapps/docs/config/http.xml | 10 +
6 files changed, 443 insertions(+), 81 deletions(-)
diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java
b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
index 0b8b683..682012c 100644
--- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
@@ -741,6 +741,14 @@ public abstract class AbstractEndpoint<S,U> {
public boolean getDaemon() { return daemon; }
+ /**
+ * Expose async IO capability.
+ */
+ private boolean useAsyncIO = true;
+ public void setUseAsyncIO(boolean useAsyncIO) { this.useAsyncIO =
useAsyncIO; }
+ public boolean getUseAsyncIO() { return useAsyncIO; }
+
+
protected abstract boolean getDeferAccept();
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java
b/java/org/apache/tomcat/util/net/AprEndpoint.java
index b9f15d0..bfb5c55 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -129,6 +129,10 @@ public class AprEndpoint extends
AbstractEndpoint<Long,Long> implements SNICallB
public boolean getDeferAccept() { return deferAccept; }
+ @Override
+ public boolean getUseAsyncIO() { return false; }
+
+
private boolean ipv6v6only = false;
public void setIpv6v6only(boolean ipv6v6only) { this.ipv6v6only =
ipv6v6only; }
public boolean getIpv6v6only() { return ipv6v6only; }
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index fd31ac1..370934d 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -91,6 +91,7 @@ public class Nio2Endpoint extends
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
// ------------------------------------------------------------- Properties
+
/**
* Is deferAccept supported?
*/
@@ -941,7 +942,7 @@ public class Nio2Endpoint extends
AbstractJsseEndpoint<Nio2Channel,AsynchronousS
@Override
public boolean hasAsyncIO() {
- return true;
+ return getEndpoint().getUseAsyncIO();
}
/**
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 621dd63..74b0b88 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -27,7 +27,9 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
+import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
+import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -37,6 +39,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -353,7 +356,6 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
serverSock = null;
}
-
// ------------------------------------------------------ Protected Methods
public NioSelectorPool getSelectorPool() {
@@ -382,7 +384,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
- //disable blocking, APR style, we are gonna be polling it
+ // Disable blocking, polling will be used
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
@@ -533,7 +535,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
} else {
final NioSocketWrapper socketWrapper =
(NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
- //we are registering the key to start with, reset
the fairness counter.
+ // We are registering the key to start with, reset
the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
@@ -566,7 +568,8 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
new SynchronizedQueue<>();
private volatile boolean close = false;
- private long nextExpiration = 0;//optimize expiration handling
+ // Optimize expiration handling
+ private long nextExpiration = 0;
private AtomicLong wakeupCounter = new AtomicLong(0);
@@ -578,7 +581,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
public int getKeyCount() { return keyCount; }
- public Selector getSelector() { return selector;}
+ public Selector getSelector() { return selector; }
/**
* Destroy the poller.
@@ -593,7 +596,9 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
private void addEvent(PollerEvent event) {
events.offer(event);
- if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
+ if (wakeupCounter.incrementAndGet() == 0) {
+ selector.wakeup();
+ }
}
/**
@@ -608,12 +613,15 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
*/
public void add(final NioChannel socket, final int interestOps) {
PollerEvent r = eventCache.pop();
- if ( r==null) r = new PollerEvent(socket,null,interestOps);
- else r.reset(socket,null,interestOps);
+ if (r == null) {
+ r = new PollerEvent(socket, null, interestOps);
+ } else {
+ r.reset(socket, null, interestOps);
+ }
addEvent(r);
if (close) {
- NioEndpoint.NioSocketWrapper ka =
(NioEndpoint.NioSocketWrapper)socket.getAttachment();
- processSocket(ka, SocketEvent.STOP, false);
+ NioSocketWrapper socketWrapper = (NioSocketWrapper)
socket.getAttachment();
+ processSocket(socketWrapper, SocketEvent.STOP, false);
}
}
@@ -650,17 +658,20 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
- NioSocketWrapper ka = new NioSocketWrapper(socket,
NioEndpoint.this);
- socket.setSocketWrapper(ka);
- ka.setPoller(this);
- ka.setReadTimeout(getConnectionTimeout());
- ka.setWriteTimeout(getConnectionTimeout());
- ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
- ka.setSecure(isSSLEnabled());
+ NioSocketWrapper socketWrapper = new NioSocketWrapper(socket,
NioEndpoint.this);
+ socket.setSocketWrapper(socketWrapper);
+ socketWrapper.setPoller(this);
+ socketWrapper.setReadTimeout(getConnectionTimeout());
+ socketWrapper.setWriteTimeout(getConnectionTimeout());
+
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
+ socketWrapper.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
- ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER
turns into.
- if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
- else r.reset(socket,ka,OP_REGISTER);
+ socketWrapper.interestOps(SelectionKey.OP_READ);//this is what
OP_REGISTER turns into.
+ if (r == null) {
+ r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
+ } else {
+ r.reset(socket, socketWrapper, OP_REGISTER);
+ }
addEvent(r);
}
@@ -736,8 +747,8 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
- //if we are here, means we have other stuff to do
- //do a non blocking select
+ // If we are here, means we have other stuff to do
+ // Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
@@ -759,7 +770,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
log.error(sm.getString("endpoint.nio.selectorLoopError"),
x);
continue;
}
- //either we timed out or we woke up, process events first
+ // Either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
@@ -777,34 +788,38 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
iterator.remove();
processKey(sk, attachment);
}
- }//while
+ }
- //process timeouts
+ // Process timeouts
timeout(keyCount,hasEvents);
- }//while
+ }
getStopLatch().countDown();
}
- protected void processKey(SelectionKey sk, NioSocketWrapper
attachment) {
+ protected void processKey(SelectionKey sk, NioSocketWrapper
socketWrapper) {
try {
- if ( close ) {
+ if (close) {
cancelledKey(sk);
- } else if ( sk.isValid() && attachment != null ) {
- if (sk.isReadable() || sk.isWritable() ) {
- if ( attachment.getSendfileData() != null ) {
- processSendfile(sk,attachment, false);
+ } else if (sk.isValid() && socketWrapper != null) {
+ if (sk.isReadable() || sk.isWritable()) {
+ if ( socketWrapper.getSendfileData() != null ) {
+ processSendfile(sk, socketWrapper, false);
} else {
- unreg(sk, attachment, sk.readyOps());
+ unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
- if (!processSocket(attachment,
SocketEvent.OPEN_READ, true)) {
+ if (socketWrapper.readOperation != null) {
+
getExecutor().execute(socketWrapper.readOperation);
+ } else if (!processSocket(socketWrapper,
SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
- if (!processSocket(attachment,
SocketEvent.OPEN_WRITE, true)) {
+ if (socketWrapper.writeOperation != null) {
+
getExecutor().execute(socketWrapper.writeOperation);
+ } else if (!processSocket(socketWrapper,
SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
@@ -814,10 +829,10 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}
}
} else {
- //invalid key
+ // Invalid key
cancelledKey(sk);
}
- } catch ( CancelledKeyException ckx ) {
+ } catch (CancelledKeyException ckx) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
@@ -934,14 +949,14 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}
}
- protected void unreg(SelectionKey sk, NioSocketWrapper attachment, int
readyOps) {
- //this is a must, so that we don't have multiple threads messing
with the socket
- reg(sk,attachment,sk.interestOps()& (~readyOps));
+ protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper,
int readyOps) {
+ // This is a must, so that we don't have multiple threads messing
with the socket
+ reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
}
- protected void reg(SelectionKey sk, NioSocketWrapper attachment, int
intops) {
+ protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper,
int intops) {
sk.interestOps(intops);
- attachment.interestOps(intops);
+ socketWrapper.interestOps(intops);
}
protected void timeout(int keyCount, boolean hasEvents) {
@@ -956,39 +971,49 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now <
nextExpiration) && !close) {
return;
}
- //timeout
int keycount = 0;
try {
for (SelectionKey key : selector.keys()) {
keycount++;
try {
- NioSocketWrapper ka = (NioSocketWrapper)
key.attachment();
- if ( ka == null ) {
- cancelledKey(key); //we don't support any keys
without attachments
+ NioSocketWrapper socketWrapper = (NioSocketWrapper)
key.attachment();
+ if ( socketWrapper == null ) {
+ // We don't support any keys without attachments
+ cancelledKey(key);
} else if (close) {
key.interestOps(0);
- ka.interestOps(0); //avoid duplicate stop calls
- processKey(key,ka);
- } else if ((ka.interestOps()&SelectionKey.OP_READ) ==
SelectionKey.OP_READ ||
- (ka.interestOps()&SelectionKey.OP_WRITE) ==
SelectionKey.OP_WRITE) {
+ // Avoid duplicate stop calls
+ socketWrapper.interestOps(0);
+ processKey(key,socketWrapper);
+ } else if
((socketWrapper.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
+
(socketWrapper.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
boolean isTimedOut = false;
+ boolean readTimeout = false;
+ boolean writeTimeout = false;
// Check for read timeout
- if ((ka.interestOps() & SelectionKey.OP_READ) ==
SelectionKey.OP_READ) {
- long delta = now - ka.getLastRead();
- long timeout = ka.getReadTimeout();
+ if ((socketWrapper.interestOps() &
SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+ long delta = now - socketWrapper.getLastRead();
+ long timeout = socketWrapper.getReadTimeout();
isTimedOut = timeout > 0 && delta > timeout;
+ readTimeout = true;
}
// Check for write timeout
- if (!isTimedOut && (ka.interestOps() &
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
- long delta = now - ka.getLastWrite();
- long timeout = ka.getWriteTimeout();
+ if (!isTimedOut && (socketWrapper.interestOps() &
SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
+ long delta = now -
socketWrapper.getLastWrite();
+ long timeout = socketWrapper.getWriteTimeout();
isTimedOut = timeout > 0 && delta > timeout;
+ writeTimeout = true;
}
if (isTimedOut) {
key.interestOps(0);
- ka.interestOps(0); //avoid duplicate timeout
calls
- ka.setError(new SocketTimeoutException());
- if (!processSocket(ka, SocketEvent.ERROR,
true)) {
+ // Avoid duplicate timeout calls
+ socketWrapper.interestOps(0);
+ socketWrapper.setError(new
SocketTimeoutException());
+ if (readTimeout && socketWrapper.readOperation
!= null) {
+
getExecutor().execute(socketWrapper.readOperation);
+ } else if (writeTimeout &&
socketWrapper.writeOperation != null) {
+
getExecutor().execute(socketWrapper.writeOperation);
+ } else if (!processSocket(socketWrapper,
SocketEvent.ERROR, true)) {
cancelledKey(key);
}
}
@@ -996,12 +1021,13 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
- }//for
+ }
} catch (ConcurrentModificationException cme) {
// See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
}
- long prevExp = nextExpiration; //for logging purposes only
+ // For logging purposes only
+ long prevExp = nextExpiration;
nextExpiration = System.currentTimeMillis() +
socketProperties.getTimeoutInterval();
if (log.isTraceEnabled()) {
@@ -1014,7 +1040,8 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}
}
- // ---------------------------------------------------- Key Attachment
Class
+ // --------------------------------------------------- Socket Wrapper Class
+
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel>
{
private final NioSelectorPool pool;
@@ -1023,6 +1050,10 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
private int interestOps = 0;
private CountDownLatch readLatch = null;
private CountDownLatch writeLatch = null;
+ private final Semaphore readPending;
+ private OperationState<?> readOperation = null;
+ private final Semaphore writePending;
+ private OperationState<?> writeOperation = null;
private volatile SendfileData sendfileData = null;
private volatile long lastRead = System.currentTimeMillis();
private volatile long lastWrite = lastRead;
@@ -1030,41 +1061,54 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
super(channel, endpoint);
+ if (endpoint.getUseAsyncIO()) {
+ readPending = new Semaphore(1);
+ writePending = new Semaphore(1);
+ } else {
+ readPending = null;
+ writePending = null;
+ }
pool = endpoint.getSelectorPool();
socketBufferHandler = channel.getBufHandler();
}
- public Poller getPoller() { return poller;}
- public void setPoller(Poller poller){this.poller = poller;}
- public int interestOps() { return interestOps;}
+ public Poller getPoller() { return poller; }
+ public void setPoller(Poller poller){this.poller = poller; }
+ public int interestOps() { return interestOps; }
public int interestOps(int ops) { this.interestOps = ops; return ops;
}
public CountDownLatch getReadLatch() { return readLatch; }
public CountDownLatch getWriteLatch() { return writeLatch; }
protected CountDownLatch resetLatch(CountDownLatch latch) {
- if ( latch==null || latch.getCount() == 0 ) return null;
- else throw new
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
+ if (latch==null || latch.getCount() == 0) {
+ return null;
+ } else {
+ throw new
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
+ }
}
public void resetReadLatch() { readLatch = resetLatch(readLatch); }
public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
- if ( latch == null || latch.getCount() == 0 ) {
+ if (latch == null || latch.getCount() == 0) {
return new CountDownLatch(cnt);
+ } else {
+ throw new
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
}
- else throw new
IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
}
- public void startReadLatch(int cnt) { readLatch =
startLatch(readLatch,cnt);}
- public void startWriteLatch(int cnt) { writeLatch =
startLatch(writeLatch,cnt);}
+ public void startReadLatch(int cnt) { readLatch =
startLatch(readLatch, cnt); }
+ public void startWriteLatch(int cnt) { writeLatch =
startLatch(writeLatch, cnt); }
protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit
unit) throws InterruptedException {
- if ( latch == null ) throw new
IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
+ if (latch == null) {
+ throw new
IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
+ }
// Note: While the return value is ignored if the latch does time
// out, logic further up the call stack will trigger a
// SocketTimeoutException
- latch.await(timeout,unit);
+ latch.await(timeout, unit);
}
- public void awaitReadLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(readLatch,timeout,unit);}
- public void awaitWriteLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(writeLatch,timeout,unit);}
+ public void awaitReadLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(readLatch, timeout, unit); }
+ public void awaitWriteLatch(long timeout, TimeUnit unit) throws
InterruptedException { awaitLatch(writeLatch, timeout, unit); }
public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
public SendfileData getSendfileData() { return this.sendfileData;}
@@ -1074,7 +1118,6 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
public void updateLastRead() { lastRead = System.currentTimeMillis(); }
public long getLastRead() { return lastRead; }
-
@Override
public boolean isReadyForRead() throws IOException {
socketBufferHandler.configureReadBufferForRead();
@@ -1189,12 +1232,11 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
// Ignore
}
try {
- NioEndpoint.NioSocketWrapper att =
(NioEndpoint.NioSocketWrapper) channel
- .getAttachment();
- if (att == null) {
+ NioSocketWrapper socketWrapper = (NioSocketWrapper)
channel.getAttachment();
+ if (socketWrapper == null) {
throw new
IOException(sm.getString("endpoint.nio.keyMustBeCancelled"));
}
- nRead = pool.read(to, channel, selector,
att.getReadTimeout());
+ nRead = pool.read(to, channel, selector,
socketWrapper.getReadTimeout());
} finally {
if (selector != null) {
pool.put(selector);
@@ -1354,6 +1396,296 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
public void setAppReadBufHandler(ApplicationBufferHandler handler) {
getSocket().setAppReadBufHandler(handler);
}
+
+ @Override
+ public boolean hasAsyncIO() {
+ // The semaphores are only created if async IO is enabled
+ return (readPending != null);
+ }
+
+ /**
+ * Internal state tracker for scatter/gather operations.
+ */
+ private class OperationState<A> implements Runnable {
+ private final boolean read;
+ private final ByteBuffer[] buffers;
+ private final int offset;
+ private final int length;
+ private final A attachment;
+ private final BlockingMode block;
+ private final CompletionCheck check;
+ private final CompletionHandler<Long, ? super A> handler;
+ private final Semaphore semaphore;
+ private final VectoredIOCompletionHandler<A> completion;
+ private OperationState(boolean read, ByteBuffer[] buffers, int
offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A
attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A>
handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A>
completion) {
+ this.read = read;
+ this.buffers = buffers;
+ this.offset = offset;
+ this.length = length;
+ this.block = block;
+ this.attachment = attachment;
+ this.check = check;
+ this.handler = handler;
+ this.semaphore = semaphore;
+ this.completion = completion;
+ }
+ private volatile boolean inline = true;
+ private volatile long nBytes = 0;
+ private volatile CompletionState state = CompletionState.PENDING;
+
+ @Override
+ public void run() {
+ // Perform the IO operation
+ // Called from the poller to continue the IO operation
+ long nBytes = 0;
+ if (getError() == null) {
+ try {
+ if (read) {
+ nBytes = getSocket().read(buffers, offset, length);
+ } else {
+ nBytes = getSocket().write(buffers, offset,
length);
+ }
+ } catch (IOException e) {
+ setError(e);
+ }
+ }
+ if (nBytes > 0) {
+ // The bytes read are only updated in the completion
handler
+ completion.completed(Long.valueOf(nBytes), this);
+ } else if (nBytes < 0 || getError() != null) {
+ IOException error = getError();
+ if (error == null) {
+ error = new EOFException();
+ }
+ completion.failed(error, this);
+ } else {
+ // As soon as the operation uses the poller, it is no
longer inline
+ inline = false;
+ if (read) {
+ registerReadInterest();
+ } else {
+ registerWriteInterest();
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public <A> CompletionState read(ByteBuffer[] dsts, int offset, int
length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A>
handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
+ if (timeout == -1) {
+ timeout = toTimeout(getReadTimeout());
+ } else if (unit.toMillis(timeout) != getReadTimeout()) {
+ setReadTimeout(unit.toMillis(timeout));
+ }
+ if (block != BlockingMode.NON_BLOCK) {
+ try {
+ if (!readPending.tryAcquire(timeout, unit)) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ } else {
+ if (!readPending.tryAcquire()) {
+ return CompletionState.NOT_DONE;
+ }
+ }
+ VectoredIOCompletionHandler<A> completion = new
VectoredIOCompletionHandler<>();
+ OperationState<A> state = new OperationState<>(true, dsts, offset,
length, block,
+ timeout, unit, attachment, check, handler, readPending,
completion);
+ readOperation = state;
+ long nBytes = 0;
+ if (!socketBufferHandler.isReadBufferEmpty()) {
+ // There is still data inside the main read buffer, use it to
fill out the destination buffers
+ // Note: It is not necessary to put this code in the
completion handler
+ socketBufferHandler.configureReadBufferForRead();
+ for (int i = 0; i < length &&
!socketBufferHandler.isReadBufferEmpty(); i++) {
+ nBytes += transfer(socketBufferHandler.getReadBuffer(),
dsts[offset + i]);
+ }
+ if (nBytes > 0) {
+ completion.completed(Long.valueOf(nBytes), state);
+ }
+ }
+ if (nBytes == 0) {
+ state.run();
+ }
+ if (block == BlockingMode.BLOCK) {
+ synchronized (state) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ state.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ }
+ }
+ return state.state;
+ }
+
+ @Override
+ public <A> CompletionState write(ByteBuffer[] srcs, int offset, int
length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A>
handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
+ if (timeout == -1) {
+ timeout = toTimeout(getWriteTimeout());
+ } else if (unit.toMillis(timeout) != getWriteTimeout()) {
+ setWriteTimeout(unit.toMillis(timeout));
+ }
+ if (block != BlockingMode.NON_BLOCK) {
+ try {
+ if (!writePending.tryAcquire(timeout, unit)) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ } else {
+ if (!writePending.tryAcquire()) {
+ return CompletionState.NOT_DONE;
+ }
+ }
+ if (!socketBufferHandler.isWriteBufferEmpty()) {
+ // First flush the main buffer as needed
+ try {
+ doWrite(true);
+ } catch (IOException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ VectoredIOCompletionHandler<A> completion = new
VectoredIOCompletionHandler<>();
+ OperationState<A> state = new OperationState<>(false, srcs,
offset, length, block,
+ timeout, unit, attachment, check, handler, writePending,
completion);
+ writeOperation = state;
+ // It should be less necessary to check the buffer state as it is
easy to flush before
+ state.run();
+ if (block == BlockingMode.BLOCK) {
+ synchronized (state) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ state.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ }
+ }
+ return state.state;
+ }
+
+ private class VectoredIOCompletionHandler<A> implements
CompletionHandler<Long, OperationState<A>> {
+ @Override
+ public void completed(Long nBytes, OperationState<A> state) {
+ if (nBytes.longValue() < 0) {
+ failed(new EOFException(), state);
+ } else {
+ state.nBytes += nBytes.longValue();
+ CompletionState currentState = state.inline ?
CompletionState.INLINE : CompletionState.DONE;
+ boolean complete = true;
+ boolean completion = true;
+ if (state.check != null) {
+ switch (state.check.callHandler(currentState,
state.buffers, state.offset, state.length)) {
+ case CONTINUE:
+ complete = false;
+ break;
+ case DONE:
+ break;
+ case NONE:
+ completion = false;
+ break;
+ }
+ }
+ if (complete) {
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.read) {
+ readOperation = null;
+ } else {
+ writeOperation = null;
+ }
+ if (state.block == BlockingMode.BLOCK && currentState
!= CompletionState.INLINE) {
+ notify = true;
+ } else {
+ state.state = currentState;
+ }
+ if (completion && state.handler != null) {
+
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ }
+ if (notify) {
+ synchronized (state) {
+ state.state = currentState;
+ state.notify();
+ }
+ }
+ } else {
+ state.run();
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, OperationState<A> state) {
+ IOException ioe;
+ if (exc instanceof InterruptedByTimeoutException) {
+ ioe = new SocketTimeoutException();
+ } else if (exc instanceof IOException) {
+ ioe = (IOException) exc;
+ } else {
+ ioe = new IOException(exc);
+ }
+ setError(ioe);
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.read) {
+ readOperation = null;
+ } else {
+ writeOperation = null;
+ }
+ if (state.block == BlockingMode.BLOCK) {
+ notify = true;
+ } else {
+ state.state = state.inline ? CompletionState.ERROR :
CompletionState.DONE;
+ }
+ if (state.handler != null) {
+ state.handler.failed(ioe, state.attachment);
+ }
+ if (notify) {
+ synchronized (state) {
+ state.state = state.inline ? CompletionState.ERROR :
CompletionState.DONE;
+ state.notify();
+ }
+ }
+ }
+ }
+
}
@@ -1443,6 +1775,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}
// ----------------------------------------------- SendfileData Inner Class
+
/**
* SendfileData class.
*/
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 7fef23d..fc07b86 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -50,6 +50,12 @@
<update>
Add vectoring for NIO in the base and SSL channels. (remm)
</update>
+ <add>
+ Add asynchronous IO from NIO2 to the NIO connector, with support for
+ the async IO implementations for HTTP/2 and Websockets. The
+ <code>useAsyncIO</code> boolean attribute on the Connector element
+ allows disabling usage of the asynchronous IO API. (remm)
+ </add>
</changelog>
</subsection>
</section>
diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml
index f3e3f7a..9e68e39 100644
--- a/webapps/docs/config/http.xml
+++ b/webapps/docs/config/http.xml
@@ -760,6 +760,11 @@
default value is <code>1000</code> milliseconds.</p>
</attribute>
+ <attribute name="useAsyncIO" required="false">
+ <p>(bool)Use this attribute to enable or disable usage of the
+ asynchronous IO API. The default value is <code>true</code>.</p>
+ </attribute>
+
<attribute name="useSendfile" required="false">
<p>(bool)Use this attribute to enable or disable sendfile capability.
The default value is <code>true</code>. Note that the use of sendfile
@@ -899,6 +904,11 @@
<attributes>
+ <attribute name="useAsyncIO" required="false">
+ <p>(bool)Use this attribute to enable or disable usage of the
+ asynchronous IO API. The default value is <code>true</code>.</p>
+ </attribute>
+
<attribute name="useSendfile" required="false">
<p>(bool)Use this attribute to enable or disable sendfile capability.
The default value is <code>true</code>. Note that the use of sendfile
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]