This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new b0393cb Add sync on operation
b0393cb is described below
commit b0393cbafc4324a85bb5759f15954e1b96f3bc8b
Author: remm <[email protected]>
AuthorDate: Mon Apr 15 18:17:26 2019 +0200
Add sync on operation
As an experiment, to mimic sync on the wrapper. Also add error handling
when executing the operation.
---
java/org/apache/tomcat/util/net/NioEndpoint.java | 81 +++++++++++++++---------
1 file changed, 52 insertions(+), 29 deletions(-)
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 8ac66ff..3fc5564 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -39,6 +39,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -811,14 +812,18 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
// Read goes before write
if (sk.isReadable()) {
if (socketWrapper.readOperation != null) {
-
getExecutor().execute(socketWrapper.readOperation);
+ if
(!socketWrapper.readOperation.process()) {
+ closeSocket = true;
+ }
} else if (!processSocket(socketWrapper,
SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
-
getExecutor().execute(socketWrapper.writeOperation);
+ if
(!socketWrapper.writeOperation.process()) {
+ closeSocket = true;
+ }
} else if (!processSocket(socketWrapper,
SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
@@ -1436,38 +1441,56 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
private volatile long nBytes = 0;
private volatile CompletionState state = CompletionState.PENDING;
+ public boolean process() {
+ try {
+ getEndpoint().getExecutor().execute(this);
+ } catch (RejectedExecutionException ree) {
+ log.warn(sm.getString("endpoint.executor.fail",
NioSocketWrapper.this) , ree);
+ return false;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ // This means we got an OOM or similar creating a thread,
or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
@Override
public void run() {
- // Perform the IO operation
- // Called from the poller to continue the IO operation
- long nBytes = 0;
- if (getError() == null) {
- try {
- if (read) {
- nBytes = getSocket().read(buffers, offset, length);
- } else {
- nBytes = getSocket().write(buffers, offset,
length);
+ synchronized (semaphore) {
+ // Perform the IO operation
+ // Called from the poller to continue the IO operation
+ long nBytes = 0;
+ if (getError() == null) {
+ try {
+ if (read) {
+ nBytes = getSocket().read(buffers, offset,
length);
+ } else {
+ nBytes = getSocket().write(buffers, offset,
length);
+ }
+ } catch (IOException e) {
+ setError(e);
}
- } catch (IOException e) {
- setError(e);
- }
- }
- if (nBytes > 0) {
- // The bytes read are only updated in the completion
handler
- completion.completed(Long.valueOf(nBytes), this);
- } else if (nBytes < 0 || getError() != null) {
- IOException error = getError();
- if (error == null) {
- error = new EOFException();
}
- completion.failed(error, this);
- } else {
- // As soon as the operation uses the poller, it is no
longer inline
- inline = false;
- if (read) {
- registerReadInterest();
+ if (nBytes > 0) {
+ // The bytes read are only updated in the completion
handler
+ completion.completed(Long.valueOf(nBytes), this);
+ } else if (nBytes < 0 || getError() != null) {
+ IOException error = getError();
+ if (error == null) {
+ error = new EOFException();
+ }
+ completion.failed(error, this);
} else {
- registerWriteInterest();
+ // As soon as the operation uses the poller, it is no
longer inline
+ inline = false;
+ if (read) {
+ registerReadInterest();
+ } else {
+ registerWriteInterest();
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]