Author: remm
Date: Tue Mar 28 08:15:05 2017
New Revision: 1789065

URL: http://svn.apache.org/viewvc?rev=1789065&view=rev
Log:
Better blocking for async IO.

Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1789065&r1=1789064&r2=1789065&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Mar 28 
08:15:05 2017
@@ -852,14 +852,16 @@ public class Nio2Endpoint extends Abstra
             private final A attachment;
             private final long timeout;
             private final TimeUnit unit;
+            private final BlockingMode block;
             private final CompletionCheck check;
             private final CompletionHandler<Long, ? super A> handler;
             private OperationState(ByteBuffer[] buffers, int offset, int 
length,
-                    long timeout, TimeUnit unit, A attachment, CompletionCheck 
check,
-                    CompletionHandler<Long, ? super A> handler) {
+                    BlockingMode block, long timeout, TimeUnit unit, A 
attachment,
+                    CompletionCheck check, CompletionHandler<Long, ? super A> 
handler) {
                 this.buffers = buffers;
                 this.offset = offset;
                 this.length = length;
+                this.block = block;
                 this.timeout = timeout;
                 this.unit = unit;
                 this.attachment = attachment;
@@ -894,7 +896,14 @@ public class Nio2Endpoint extends Abstra
                     }
                     if (complete) {
                         readPending.release();
-                        state.state = currentState;
+                        if (state.block == BlockingMode.BLOCK && currentState 
!= CompletionState.INLINE) {
+                            synchronized (this) {
+                                state.state = currentState;
+                                notify();
+                            }
+                        } else {
+                            state.state = currentState;
+                        }
                         if (completion && state.handler != null) {
                             
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
                         }
@@ -914,11 +923,18 @@ public class Nio2Endpoint extends Abstra
                 }
                 setError(ioe);
                 readPending.release();
+                if (state.block == BlockingMode.BLOCK) {
+                    synchronized (this) {
+                        state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
+                        notify();
+                    }
+                } else {
+                    state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
+                }
                 if (exc instanceof AsynchronousCloseException) {
                     // If already closed, don't call onError and close again
                     return;
                 }
-                state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR 
: CompletionState.DONE;
                 if (state.handler != null) {
                     state.handler.failed(ioe, state.attachment);
                 }
@@ -949,7 +965,14 @@ public class Nio2Endpoint extends Abstra
                     }
                     if (complete) {
                         writePending.release();
-                        state.state = currentState;
+                        if (state.block == BlockingMode.BLOCK && currentState 
!= CompletionState.INLINE) {
+                            synchronized (this) {
+                                state.state = currentState;
+                                notify();
+                            }
+                        } else {
+                            state.state = currentState;
+                        }
                         if (completion && state.handler != null) {
                             
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
                         }
@@ -969,7 +992,14 @@ public class Nio2Endpoint extends Abstra
                 }
                 setError(ioe);
                 writePending.release();
-                state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR 
: CompletionState.DONE;
+                if (state.block == BlockingMode.BLOCK) {
+                    synchronized (this) {
+                        state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
+                        notify();
+                    }
+                } else {
+                    state.state = Nio2Endpoint.isInline() ? 
CompletionState.ERROR : CompletionState.DONE;
+                }
                 if (state.handler != null) {
                     state.handler.failed(ioe, state.attachment);
                 }
@@ -995,14 +1025,25 @@ public class Nio2Endpoint extends Abstra
                     return CompletionState.NOT_DONE;
                 }
             }
-            OperationState<A> state = new OperationState<>(dsts, offset, 
length, timeout, unit, attachment, check, handler);
+            OperationState<A> state = new OperationState<>(dsts, offset, 
length, block, timeout, unit, attachment, check, handler);
+            ScatterReadCompletionHandler<A> completion = new 
ScatterReadCompletionHandler<>();
             Nio2Endpoint.startInline();
-            getSocket().read(dsts, offset, length, timeout, unit, state, new 
ScatterReadCompletionHandler<>());
+            getSocket().read(dsts, offset, length, timeout, unit, state, 
completion);
             Nio2Endpoint.endInline();
-            if (block == BlockingMode.BLOCK && state.state == 
CompletionState.PENDING) {
-                if (!awaitReadComplete(timeout, unit)) {
-                    handler.failed(new SocketTimeoutException(), attachment);
-                    return CompletionState.ERROR;
+            if (block == BlockingMode.BLOCK) {
+                synchronized (completion) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            completion.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                handler.failed(new SocketTimeoutException(), 
attachment);
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), 
attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
                 }
             }
             return state.state;
@@ -1016,9 +1057,14 @@ public class Nio2Endpoint extends Abstra
         }
 
         @Override
-        public  <A> CompletionState write(ByteBuffer[] srcs, int offset, int 
length,
+        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int 
length,
                 BlockingMode block, long timeout, TimeUnit unit, A attachment,
                 CompletionCheck check, CompletionHandler<Long, ? super A> 
handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
             if (block != BlockingMode.NON_BLOCK) {
                 try {
                     if (!writePending.tryAcquire(timeout, unit)) {
@@ -1034,14 +1080,25 @@ public class Nio2Endpoint extends Abstra
                     return CompletionState.NOT_DONE;
                 }
             }
-            OperationState<A> state = new OperationState<>(srcs, offset, 
length, timeout, unit, attachment, check, handler);
+            OperationState<A> state = new OperationState<>(srcs, offset, 
length, block, timeout, unit, attachment, check, handler);
+            GatherWriteCompletionHandler<A> completion = new 
GatherWriteCompletionHandler<>();
             Nio2Endpoint.startInline();
-            getSocket().write(srcs, offset, length, timeout, unit, state, new 
GatherWriteCompletionHandler<>());
+            getSocket().write(srcs, offset, length, timeout, unit, state, 
completion);
             Nio2Endpoint.endInline();
-            if (block == BlockingMode.BLOCK && state.state == 
CompletionState.PENDING) {
-                if (!awaitWriteComplete(timeout, unit)) {
-                    handler.failed(new SocketTimeoutException(), attachment);
-                    return CompletionState.ERROR;
+            if (block == BlockingMode.BLOCK) {
+                synchronized (completion) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            completion.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                handler.failed(new SocketTimeoutException(), 
attachment);
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), 
attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
                 }
             }
             return state.state;
@@ -1282,11 +1339,13 @@ public class Nio2Endpoint extends Abstra
             try {
                 if (readPending.tryAcquire(timeout, unit)) {
                     readPending.release();
+                    return true;
+                } else {
+                    return false;
                 }
             } catch (InterruptedException e) {
                 return false;
             }
-            return true;
         }
 
 
@@ -1295,11 +1354,13 @@ public class Nio2Endpoint extends Abstra
             try {
                 if (writePending.tryAcquire(timeout, unit)) {
                     writePending.release();
+                    return true;
+                } else {
+                    return false;
                 }
             } catch (InterruptedException e) {
                 return false;
             }
-            return true;
         }
 
         /*



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to