Author: markt Date: Fri Feb 27 15:00:39 2015 New Revision: 1662698 URL: http://svn.apache.org/r1662698 Log: Switch to using blocking writes directly
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri Feb 27 15:00:39 2015 @@ -27,15 +27,18 @@ class MessagePart { private final ByteBuffer payload; private final SendHandler intermediateHandler; private volatile SendHandler endHandler; + private final long blockingWriteTimeoutExpiry; public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload, - SendHandler intermediateHandler, SendHandler endHandler) { + SendHandler intermediateHandler, SendHandler endHandler, + long blockingWriteTimeoutExpiry) { this.fin = fin; this.rsv = rsv; this.opCode = opCode; this.payload = payload; this.intermediateHandler = intermediateHandler; this.endHandler = endHandler; + this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry; } @@ -71,6 +74,10 @@ class MessagePart { public void setEndHandler(SendHandler endHandler) { this.endHandler = endHandler; } + + public long getBlockingWriteTimeoutExpiry() { + return blockingWriteTimeoutExpiry; + } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Fri Feb 27 15:00:39 2015 @@ -362,13 +362,14 @@ public class PerMessageDeflate implement boolean fin = uncompressedPart.isFin(); boolean full = compressedPayload.limit() == compressedPayload.capacity(); boolean needsInput = deflater.needsInput(); + long blockingWriteTimeoutExpiry = uncompressedPart.getBlockingWriteTimeoutExpiry(); if (fin && !full && needsInput) { // End of compressed message. Drop EOM bytes and output. compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length); compressedPart = new MessagePart(true, getRsv(uncompressedPart), opCode, compressedPayload, uncompressedIntermediateHandler, - uncompressedIntermediateHandler); + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); deflateRequired = false; startNewMessage(); } else if (full && !needsInput) { @@ -376,13 +377,13 @@ public class PerMessageDeflate implement // Output and start new compressed part. compressedPart = new MessagePart(false, getRsv(uncompressedPart), opCode, compressedPayload, uncompressedIntermediateHandler, - uncompressedIntermediateHandler); + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); } else if (!fin && full && needsInput) { // Write buffer full and input message not fully read. // Output and get more data. compressedPart = new MessagePart(false, getRsv(uncompressedPart), opCode, compressedPayload, uncompressedIntermediateHandler, - uncompressedIntermediateHandler); + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); deflateRequired = false; } else if (fin && full && needsInput) { // Write buffer full. Input fully read. Deflater may be @@ -398,7 +399,8 @@ public class PerMessageDeflate implement compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten); compressedPart = new MessagePart(true, getRsv(uncompressedPart), opCode, compressedPayload, - uncompressedIntermediateHandler, uncompressedIntermediateHandler); + uncompressedIntermediateHandler, uncompressedIntermediateHandler, + blockingWriteTimeoutExpiry); deflateRequired = false; startNewMessage(); } else { @@ -407,7 +409,8 @@ public class PerMessageDeflate implement writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten); compressedPart = new MessagePart(false, getRsv(uncompressedPart), opCode, compressedPayload, - uncompressedIntermediateHandler, uncompressedIntermediateHandler); + uncompressedIntermediateHandler, uncompressedIntermediateHandler, + blockingWriteTimeoutExpiry); } } else { throw new IllegalStateException("Should never happen"); Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Feb 27 15:00:39 2015 @@ -29,6 +29,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +58,8 @@ public abstract class WsRemoteEndpointIm public static final String BLOCKING_SEND_TIMEOUT_PROPERTY = "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT"; + protected static final SendResult SENDRESULT_OK = new SendResult(); + private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class); private final StateMachine stateMachine = new StateMachine(); @@ -65,7 +68,7 @@ public abstract class WsRemoteEndpointIm new IntermediateMessageHandler(this); private Transformation transformation = null; - private boolean messagePartInProgress = false; + private final Semaphore messagePartInProgress = new Semaphore(1); private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); private final Object messagePartLock = new Object(); @@ -266,21 +269,53 @@ public abstract class WsRemoteEndpointIm // trigger a session close and depending on timing the client // session may close before we can read the timeout. long timeout = getBlockingSendTimeout(); - FutureToSendHandler f2sh = new FutureToSendHandler(wsSession); - startMessage(opCode, payload, last, f2sh); - try { - if (timeout == -1) { - f2sh.get(); - } else { - f2sh.get(timeout, TimeUnit.MILLISECONDS); + long timeoutExpiry; + if (timeout < 0) { + timeoutExpiry = Long.MAX_VALUE; + } else { + timeoutExpiry = System.currentTimeMillis() + timeout; + } + + wsSession.updateLastActive(); + + BlockingSendHandler bsh = new BlockingSendHandler(); + + List<MessagePart> messageParts = new ArrayList<>(); + messageParts.add(new MessagePart(last, 0, opCode, payload, bsh, bsh, timeoutExpiry)); + + messageParts = transformation.sendMessagePart(messageParts); + + // Some extensions/transformations may buffer messages so it is possible + // that no message parts will be returned. If this is the case simply + // return. + if (messageParts.size() == 0) { + return; + } + + synchronized (messagePartLock) { + try { + if (!messagePartInProgress.tryAcquire(timeout, TimeUnit.MILLISECONDS)) { + // TODO i18n + throw new IOException(); + } + } catch (InterruptedException e) { + // TODO i18n + throw new IOException(e); } - if (payload != null) { - payload.clear(); + } + + for (MessagePart mp : messageParts) { + writeMessagePart(mp); + if (!bsh.getSendResult().isOK()) { + throw new IOException (bsh.getSendResult().getException()); } - } catch (InterruptedException | ExecutionException | - TimeoutException e) { - throw new IOException(e); } + + if (payload != null) { + payload.clear(); + } + + endMessage(null, null); } @@ -292,7 +327,7 @@ public abstract class WsRemoteEndpointIm List<MessagePart> messageParts = new ArrayList<>(); messageParts.add(new MessagePart(last, 0, opCode, payload, intermediateMessageHandler, - new EndMessageHandler(this, handler))); + new EndMessageHandler(this, handler), -1)); messageParts = transformation.sendMessagePart(messageParts); @@ -313,7 +348,9 @@ public abstract class WsRemoteEndpointIm // the session has been closed. Complain loudly. log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed")); } - if (messagePartInProgress) { + if (messagePartInProgress.tryAcquire()) { + doWrite = true; + } else { // When a control message is sent while another message is being // sent, the control message is queued. Chances are the // subsequent data message part will end up queued while the @@ -324,9 +361,6 @@ public abstract class WsRemoteEndpointIm // Add it to the queue messagePartQueue.add(mp); - } else { - messagePartInProgress = true; - doWrite = true; } // Add any remaining messages to the queue messagePartQueue.addAll(messageParts); @@ -350,7 +384,7 @@ public abstract class WsRemoteEndpointIm mpNext = messagePartQueue.poll(); if (mpNext == null) { - messagePartInProgress = false; + messagePartInProgress.release(); } else if (!closed){ // Session may have been closed unexpectedly in the middle of // sending a fragmented message closing the endpoint. If this @@ -388,7 +422,7 @@ public abstract class WsRemoteEndpointIm outputBuffer.flip(); SendHandler flushHandler = new OutputBufferFlushSendHandler( outputBuffer, mp.getEndHandler()); - doWrite(flushHandler, outputBuffer); + doWrite(flushHandler, mp.getBlockingWriteTimeoutExpiry(), outputBuffer); return; } @@ -442,12 +476,14 @@ public abstract class WsRemoteEndpointIm if (getBatchingAllowed() || isMasked()) { // Need to write via output buffer OutputBufferSendHandler obsh = new OutputBufferSendHandler( - mp.getEndHandler(), headerBuffer, mp.getPayload(), mask, + mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(), + headerBuffer, mp.getPayload(), mask, outputBuffer, !getBatchingAllowed(), this); obsh.write(); } else { // Can write directly - doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload()); + doWrite(mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(), + headerBuffer, mp.getPayload()); } } @@ -639,7 +675,8 @@ public abstract class WsRemoteEndpointIm } - protected abstract void doWrite(SendHandler handler, ByteBuffer... data); + protected abstract void doWrite(SendHandler handler, long blockingWrieTimeoutExpiry, + ByteBuffer... data); protected abstract boolean isMasked(); protected abstract void doClose(); @@ -756,6 +793,7 @@ public abstract class WsRemoteEndpointIm private static class OutputBufferSendHandler implements SendHandler { private final SendHandler handler; + private final long blockingWriteTimeoutExpiry; private final ByteBuffer headerBuffer; private final ByteBuffer payload; private final byte[] mask; @@ -765,9 +803,11 @@ public abstract class WsRemoteEndpointIm private int maskIndex = 0; public OutputBufferSendHandler(SendHandler completion, + long blockingWriteTimeoutExpiry, ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask, ByteBuffer outputBuffer, boolean flushRequired, WsRemoteEndpointImplBase endpoint) { + this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry; this.handler = completion; this.headerBuffer = headerBuffer; this.payload = payload; @@ -785,7 +825,7 @@ public abstract class WsRemoteEndpointIm if (headerBuffer.hasRemaining()) { // Still more headers to write, need to flush outputBuffer.flip(); - endpoint.doWrite(this, outputBuffer); + endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); return; } @@ -819,7 +859,7 @@ public abstract class WsRemoteEndpointIm payload.limit(payloadLimit); // Still more headers to write, need to flush outputBuffer.flip(); - endpoint.doWrite(this, outputBuffer); + endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); return; } @@ -828,7 +868,7 @@ public abstract class WsRemoteEndpointIm if (outputBuffer.remaining() == 0) { handler.onResult(new SendResult()); } else { - endpoint.doWrite(this, outputBuffer); + endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); } } else { handler.onResult(new SendResult()); @@ -840,7 +880,7 @@ public abstract class WsRemoteEndpointIm public void onResult(SendResult result) { if (result.isOK()) { if (outputBuffer.hasRemaining()) { - endpoint.doWrite(this, outputBuffer); + endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); } else { outputBuffer.clear(); write(); @@ -1169,4 +1209,19 @@ public abstract class WsRemoteEndpointIm handler.onResult(result); } } + + + private static class BlockingSendHandler implements SendHandler { + + private SendResult sendResult = null; + + @Override + public void onResult(SendResult result) { + sendResult = result; + } + + public SendResult getSendResult() { + return sendResult; + } + } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java Fri Feb 27 15:00:39 2015 @@ -16,10 +16,14 @@ */ package org.apache.tomcat.websocket; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.websocket.SendHandler; +import javax.websocket.SendResult; public class WsRemoteEndpointImplClient extends WsRemoteEndpointImplBase { @@ -37,20 +41,31 @@ public class WsRemoteEndpointImplClient @Override - protected void doWrite(SendHandler handler, ByteBuffer... data) { - long timeout = getSendTimeout(); - if (timeout < 1) { - timeout = Long.MAX_VALUE; - - } - SendHandlerToCompletionHandler sh2ch = - new SendHandlerToCompletionHandler(handler); - try { - channel.write(data, 0, data.length, timeout, TimeUnit.MILLISECONDS, - null, sh2ch); - } catch (IllegalStateException ise) { - sh2ch.failed(ise, null); + protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, + ByteBuffer... data) { + long timeout; + for (ByteBuffer byteBuffer : data) { + if (blockingWriteTimeoutExpiry == -1) { + timeout = getSendTimeout(); + if (timeout < 1) { + timeout = Long.MAX_VALUE; + } + } else { + timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout < 0) { + SendResult sr = new SendResult(new IOException("Blocking write timeout")); + handler.onResult(sr); + } + } + + try { + channel.write(byteBuffer).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + handler.onResult(new SendResult(e)); + return; + } } + handler.onResult(SENDRESULT_OK); } @Override Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Fri Feb 27 15:00:39 2015 @@ -72,12 +72,44 @@ public class WsRemoteEndpointImplServer @Override - protected void doWrite(SendHandler handler, ByteBuffer... buffers) { - this.handler = handler; - this.buffers = buffers; - // This is definitely the same thread that triggered the write so a - // dispatch will be required. - onWritePossible(true); + protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, + ByteBuffer... buffers) { + if (blockingWriteTimeoutExpiry == -1) { + this.handler = handler; + this.buffers = buffers; + // This is definitely the same thread that triggered the write so a + // dispatch will be required. + onWritePossible(true); + } else { + // Blocking + for (ByteBuffer buffer : buffers) { + long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout < 0) { + // TODO i18n + SendResult sr = new SendResult(new IOException("Blocking write timeout")); + handler.onResult(sr); + return; + } + socketWrapper.setWriteTimeout(timeout); + try { + socketWrapper.write(true, buffer.array(), buffer.arrayOffset(), + buffer.limit()); + timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis(); + if (timeout < 0) { + // TODO i18n + SendResult sr = new SendResult(new IOException("Blocking write timeout")); + handler.onResult(sr); + return; + } + socketWrapper.setWriteTimeout(timeout); + socketWrapper.flush(true); + handler.onResult(SENDRESULT_OK); + } catch (IOException e) { + SendResult sr = new SendResult(e); + handler.onResult(sr); + } + } + } } Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1662698&r1=1662697&r2=1662698&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java (original) +++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java Fri Feb 27 15:00:39 2015 @@ -343,9 +343,9 @@ public class TestWsWebSocketContainer ex Exception exception = null; try { while (true) { + lastSend = System.currentTimeMillis(); Future<Void> f = wsSession.getAsyncRemote().sendBinary( ByteBuffer.wrap(MESSAGE_BINARY_4K)); - lastSend = System.currentTimeMillis(); f.get(); } } catch (Exception e) { @@ -354,8 +354,8 @@ public class TestWsWebSocketContainer ex long timeout = System.currentTimeMillis() - lastSend; - // Clear the server side block and prevent and further blocks to allow - // the server to shutdown cleanly + // Clear the server side block and prevent further blocks to allow the + // server to shutdown cleanly BlockingPojo.clearBlock(); String msg = "Time out was [" + timeout + "] ms"; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org