Author: remm
Date: Mon Apr 30 15:28:26 2018
New Revision: 1830592

URL: http://svn.apache.org/viewvc?rev=1830592&view=rev
Log:
Add async IO API use in websockets writes. Although I doubt there's an actual 
benefit at the moment, the change is small and it still improves testing of the 
API as the usage is different from HTTP/2. Tested with the testsuite, the 
examples and Autobahn.

Modified:
    
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1830592&r1=1830591&r2=1830592&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
 (original)
+++ 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
 Mon Apr 30 15:28:26 2018
@@ -20,7 +20,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import javax.websocket.SendHandler;
 import javax.websocket.SendResult;
@@ -28,6 +31,10 @@ import javax.websocket.SendResult;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionCheck;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.websocket.Transformation;
 import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
@@ -62,20 +69,95 @@ public class WsRemoteEndpointImplServer
         return false;
     }
 
-
     @Override
     protected void doWrite(SendHandler handler, long 
blockingWriteTimeoutExpiry,
             ByteBuffer... buffers) {
-        if (blockingWriteTimeoutExpiry == -1) {
-            this.handler = handler;
-            this.buffers = buffers;
-            // This is definitely the same thread that triggered the write so a
-            // dispatch will be required.
-            onWritePossible(true);
+        if (socketWrapper.hasAsyncIO()) {
+            final boolean block = (blockingWriteTimeoutExpiry != -1);
+            long timeout = -1;
+            if (block) {
+                timeout = blockingWriteTimeoutExpiry - 
System.currentTimeMillis();
+                if (timeout <= 0) {
+                    SendResult sr = new SendResult(new 
SocketTimeoutException());
+                    handler.onResult(sr);
+                    return;
+                }
+            } else {
+                this.handler = handler;
+                if (timeout > 0) {
+                    // Register with timeout thread
+                    timeoutExpiry = timeout + System.currentTimeMillis();
+                    wsWriteTimeout.register(this);
+                }
+                timeout = getSendTimeout();
+            }
+            socketWrapper.write(block ? BlockingMode.BLOCK : 
BlockingMode.SEMI_BLOCK, timeout,
+                    TimeUnit.MILLISECONDS, null,
+                    new CompletionCheck() {
+                        @Override
+                        public CompletionHandlerCall 
callHandler(CompletionState state, ByteBuffer[] buffers,
+                                int offset, int length) {
+                            for (int i = 0; i < length; i++) {
+                                if (buffers[offset + i].remaining() > 0) {
+                                    return CompletionHandlerCall.CONTINUE;
+                                }
+                            }
+                            return CompletionHandlerCall.DONE;
+                        }
+                    },
+                    new CompletionHandler<Long, Void>() {
+                        @Override
+                        public void completed(Long result, Void attachment) {
+                            if (block) {
+                                long timeout = blockingWriteTimeoutExpiry - 
System.currentTimeMillis();
+                                if (timeout <= 0) {
+                                    failed(new SocketTimeoutException(), null);
+                                } else {
+                                    handler.onResult(SENDRESULT_OK);
+                                }
+                            } else {
+                                
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+                                clearHandler(null, true);
+                                if (close) {
+                                    close();
+                                }
+                            }
+                        }
+                        @Override
+                        public void failed(Throwable exc, Void attachment) {
+                            if (exc instanceof InterruptedByTimeoutException) {
+                                exc = new SocketTimeoutException();
+                            }
+                            if (block) {
+                                SendResult sr = new SendResult(exc);
+                                handler.onResult(sr);
+                            } else {
+                                
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+                                clearHandler(exc, true);
+                                close();
+                            }
+                        }
+                    }, buffers);
         } else {
-            // Blocking
-            try {
-                for (ByteBuffer buffer : buffers) {
+            if (blockingWriteTimeoutExpiry == -1) {
+                this.handler = handler;
+                this.buffers = buffers;
+                // This is definitely the same thread that triggered the write 
so a
+                // dispatch will be required.
+                onWritePossible(true);
+            } else {
+                // Blocking
+                try {
+                    for (ByteBuffer buffer : buffers) {
+                        long timeout = blockingWriteTimeoutExpiry - 
System.currentTimeMillis();
+                        if (timeout <= 0) {
+                            SendResult sr = new SendResult(new 
SocketTimeoutException());
+                            handler.onResult(sr);
+                            return;
+                        }
+                        socketWrapper.setWriteTimeout(timeout);
+                        socketWrapper.write(true, buffer);
+                    }
                     long timeout = blockingWriteTimeoutExpiry - 
System.currentTimeMillis();
                     if (timeout <= 0) {
                         SendResult sr = new SendResult(new 
SocketTimeoutException());
@@ -83,26 +165,19 @@ public class WsRemoteEndpointImplServer
                         return;
                     }
                     socketWrapper.setWriteTimeout(timeout);
-                    socketWrapper.write(true, buffer);
-                }
-                long timeout = blockingWriteTimeoutExpiry - 
System.currentTimeMillis();
-                if (timeout <= 0) {
-                    SendResult sr = new SendResult(new 
SocketTimeoutException());
+                    socketWrapper.flush(true);
+                    handler.onResult(SENDRESULT_OK);
+                } catch (IOException e) {
+                    SendResult sr = new SendResult(e);
                     handler.onResult(sr);
-                    return;
                 }
-                socketWrapper.setWriteTimeout(timeout);
-                socketWrapper.flush(true);
-                handler.onResult(SENDRESULT_OK);
-            } catch (IOException e) {
-                SendResult sr = new SendResult(e);
-                handler.onResult(sr);
             }
         }
     }
 
 
     public void onWritePossible(boolean useDispatch) {
+        // Note: Unused for async IO
         ByteBuffer[] buffers = this.buffers;
         if (buffers == null) {
             // Servlet 3.1 will call the write listener once even if nothing

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1830592&r1=1830591&r2=1830592&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Mon Apr 30 15:28:26 2018
@@ -57,6 +57,13 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="WebSocket">
+    <changelog>
+      <update>
+        Use NIO2 API for websockets writes. (remm)
+      </update>
+    </changelog>
+  </subsection>
 </section>
 <section name="Tomcat 9.0.8 (markt)" rtext="release in progress">
   <subsection name="Catalina">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to