Author: remm Date: Mon Apr 30 15:28:26 2018 New Revision: 1830592 URL: http://svn.apache.org/viewvc?rev=1830592&view=rev Log: Add async IO API use in websockets writes. Although I doubt there's an actual benefit at the moment, the change is small and it still improves testing of the API as the usage is different from HTTP/2. Tested with the testsuite, the examples and Autobahn.
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1830592&r1=1830591&r2=1830592&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Mon Apr 30 15:28:26 2018 @@ -20,7 +20,10 @@ import java.io.EOFException; import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import javax.websocket.SendHandler; import javax.websocket.SendResult; @@ -28,6 +31,10 @@ import javax.websocket.SendResult; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.net.SocketWrapperBase; +import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionCheck; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall; +import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsRemoteEndpointImplBase; @@ -62,20 +69,95 @@ public class WsRemoteEndpointImplServer return false; } - @Override protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... buffers) { - if (blockingWriteTimeoutExpiry == -1) { - this.handler = handler; - this.buffers = buffers; - // This is definitely the same thread that triggered the write so a - // dispatch will be required. - onWritePossible(true); + if (socketWrapper.hasAsyncIO()) { + final boolean block = (blockingWriteTimeoutExpiry != -1); + long timeout = -1; + if (block) { + timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout <= 0) { + SendResult sr = new SendResult(new SocketTimeoutException()); + handler.onResult(sr); + return; + } + } else { + this.handler = handler; + if (timeout > 0) { + // Register with timeout thread + timeoutExpiry = timeout + System.currentTimeMillis(); + wsWriteTimeout.register(this); + } + timeout = getSendTimeout(); + } + socketWrapper.write(block ? BlockingMode.BLOCK : BlockingMode.SEMI_BLOCK, timeout, + TimeUnit.MILLISECONDS, null, + new CompletionCheck() { + @Override + public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, + int offset, int length) { + for (int i = 0; i < length; i++) { + if (buffers[offset + i].remaining() > 0) { + return CompletionHandlerCall.CONTINUE; + } + } + return CompletionHandlerCall.DONE; + } + }, + new CompletionHandler<Long, Void>() { + @Override + public void completed(Long result, Void attachment) { + if (block) { + long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout <= 0) { + failed(new SocketTimeoutException(), null); + } else { + handler.onResult(SENDRESULT_OK); + } + } else { + wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this); + clearHandler(null, true); + if (close) { + close(); + } + } + } + @Override + public void failed(Throwable exc, Void attachment) { + if (exc instanceof InterruptedByTimeoutException) { + exc = new SocketTimeoutException(); + } + if (block) { + SendResult sr = new SendResult(exc); + handler.onResult(sr); + } else { + wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this); + clearHandler(exc, true); + close(); + } + } + }, buffers); } else { - // Blocking - try { - for (ByteBuffer buffer : buffers) { + if (blockingWriteTimeoutExpiry == -1) { + this.handler = handler; + this.buffers = buffers; + // This is definitely the same thread that triggered the write so a + // dispatch will be required. + onWritePossible(true); + } else { + // Blocking + try { + for (ByteBuffer buffer : buffers) { + long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout <= 0) { + SendResult sr = new SendResult(new SocketTimeoutException()); + handler.onResult(sr); + return; + } + socketWrapper.setWriteTimeout(timeout); + socketWrapper.write(true, buffer); + } long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); if (timeout <= 0) { SendResult sr = new SendResult(new SocketTimeoutException()); @@ -83,26 +165,19 @@ public class WsRemoteEndpointImplServer return; } socketWrapper.setWriteTimeout(timeout); - socketWrapper.write(true, buffer); - } - long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); - if (timeout <= 0) { - SendResult sr = new SendResult(new SocketTimeoutException()); + socketWrapper.flush(true); + handler.onResult(SENDRESULT_OK); + } catch (IOException e) { + SendResult sr = new SendResult(e); handler.onResult(sr); - return; } - socketWrapper.setWriteTimeout(timeout); - socketWrapper.flush(true); - handler.onResult(SENDRESULT_OK); - } catch (IOException e) { - SendResult sr = new SendResult(e); - handler.onResult(sr); } } } public void onWritePossible(boolean useDispatch) { + // Note: Unused for async IO ByteBuffer[] buffers = this.buffers; if (buffers == null) { // Servlet 3.1 will call the write listener once even if nothing Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1830592&r1=1830591&r2=1830592&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Mon Apr 30 15:28:26 2018 @@ -57,6 +57,13 @@ </fix> </changelog> </subsection> + <subsection name="WebSocket"> + <changelog> + <update> + Use NIO2 API for websockets writes. (remm) + </update> + </changelog> + </subsection> </section> <section name="Tomcat 9.0.8 (markt)" rtext="release in progress"> <subsection name="Catalina"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org