Hi, 2014-08-22 14:02 GMT+03:00 <ma...@apache.org>: > > Author: markt > Date: Fri Aug 22 11:02:19 2014 > New Revision: 1619738 > > URL: http://svn.apache.org/r1619738 > Log: > Extend support for the WebSocket permessage-deflate extension to compression of outgoing messages on the server side.
I would like to back-port this to 7.0.x. Wdyt? Regards, Violeta > Modified: > tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java > tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java > tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java > tomcat/trunk/webapps/docs/changelog.xml > > Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java > URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original) > +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri Aug 22 11:02:19 2014 > @@ -25,15 +25,17 @@ class MessagePart { > private final int rsv; > private final byte opCode; > private final ByteBuffer payload; > - private final SendHandler handler; > + private final SendHandler intermediateHandler; > + private volatile SendHandler endHandler; > > public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload, > - SendHandler handler) { > + SendHandler intermediateHandler, SendHandler endHandler) { > this.fin = fin; > this.rsv = rsv; > this.opCode = opCode; > this.payload = payload; > - this.handler = handler; > + this.intermediateHandler = intermediateHandler; > + this.endHandler = endHandler; > } > > > @@ -57,8 +59,17 @@ class MessagePart { > } > > > - public SendHandler getHandler() { > - return handler; > + public SendHandler getIntermediateHandler() { > + return intermediateHandler; > + } > + > + > + public SendHandler getEndHandler() { > + return endHandler; > + } > + > + public void setEndHandler(SendHandler endHandler) { > + this.endHandler = endHandler; > } > } > > > Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java > URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original) > +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Fri Aug 22 11:02:19 2014 > @@ -21,10 +21,12 @@ import java.nio.ByteBuffer; > import java.util.ArrayList; > import java.util.List; > import java.util.zip.DataFormatException; > +import java.util.zip.Deflater; > import java.util.zip.Inflater; > > import javax.websocket.Extension; > import javax.websocket.Extension.Parameter; > +import javax.websocket.SendHandler; > > import org.apache.tomcat.util.res.StringManager; > > @@ -47,10 +49,15 @@ public class PerMessageDeflate implement > private final boolean clientContextTakeover; > private final int clientMaxWindowBits; > private final Inflater inflater = new Inflater(true); > - private final ByteBuffer readBuffer = ByteBuffer.allocate(8192); > + private final ByteBuffer readBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); > + private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); > > private volatile Transformation next; > private volatile boolean skipDecompression = false; > + private volatile ByteBuffer writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); > + private volatile boolean deflaterResetRequired = true; > + private volatile boolean firstCompressedFrameWritten = false; > + private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1]; > > static PerMessageDeflate negotiate(List<List<Parameter>> preferences) { > // Accept the first preference that the server is able to support > @@ -288,25 +295,143 @@ public class PerMessageDeflate implement > > > @Override > - public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) { > - List<MessagePart> compressedParts = new ArrayList<>(messageParts.size()); > + public List<MessagePart> sendMessagePart(List<MessagePart> uncompressedParts) { > + List<MessagePart> allCompressedParts = new ArrayList<>(); > > - for (MessagePart messagePart : messageParts) { > - byte opCode = messagePart.getOpCode(); > + for (MessagePart uncompressedPart : uncompressedParts) { > + byte opCode = uncompressedPart.getOpCode(); > if (Util.isControl(opCode)) { > // Control messages can appear in the middle of other messages > // and must not be compressed. Pass it straight through > - compressedParts.add(messagePart); > + allCompressedParts.add(uncompressedPart); > } else { > - // TODO: Implement compression of sent messages > - compressedParts.add(messagePart); > + List<MessagePart> compressedParts = new ArrayList<>(); > + ByteBuffer uncompressedPayload = uncompressedPart.getPayload(); > + SendHandler uncompressedIntermediateHandler = > + uncompressedPart.getIntermediateHandler(); > + > + // Need to reset the deflater at the start of every message > + if (deflaterResetRequired) { > + deflater.reset(); > + deflaterResetRequired = false; > + firstCompressedFrameWritten = false; > + } > + > + deflater.setInput(uncompressedPayload.array(), > + uncompressedPayload.arrayOffset() + uncompressedPayload.position(), > + uncompressedPayload.remaining()); > + > + int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH); > + boolean deflateRequired = true; > + > + while(deflateRequired) { > + ByteBuffer compressedPayload = writeBuffer; > + > + int written = deflater.deflate(compressedPayload.array(), > + compressedPayload.arrayOffset() + compressedPayload.position(), > + compressedPayload.remaining(), flush); > + compressedPayload.position(compressedPayload.position() + written); > + > + if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) { > + // This message part has been fully processed by the > + // deflater. Fire the send handler for this message part > + // and move on to the next message part. > + break; > + } > + > + // If this point is reached, a new compressed message part > + // will be created... > + MessagePart compressedPart; > + > + // .. and a new writeBuffer will be required. > + writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); > + > + // Flip the compressed payload ready for writing > + compressedPayload.flip(); > + > + boolean fin = uncompressedPart.isFin(); > + boolean full = compressedPayload.limit() == compressedPayload.capacity(); > + boolean needsInput = deflater.needsInput(); > + > + if (fin && !full && needsInput) { > + // End of compressed message. Drop EOM bytes and output. > + compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length); > + compressedPart = new MessagePart(true, getRsv(uncompressedPart), > + opCode, compressedPayload, uncompressedIntermediateHandler, > + uncompressedIntermediateHandler); > + deflaterResetRequired = true; > + deflateRequired = false; > + } else if (full && !needsInput) { > + // Write buffer full and input message not fully read. > + // Output and start new compressed part. > + compressedPart = new MessagePart(false, getRsv(uncompressedPart), > + opCode, compressedPayload, uncompressedIntermediateHandler, > + uncompressedIntermediateHandler); > + } else if (!fin && full && needsInput) { > + // Write buffer full and input message not fully read. > + // Output and get more data. > + compressedPart = new MessagePart(false, getRsv(uncompressedPart), > + opCode, compressedPayload, uncompressedIntermediateHandler, > + uncompressedIntermediateHandler); > + deflateRequired = false; > + } else if (fin && full && needsInput) { > + // Write buffer full. Input fully read. Deflater may be > + // in one of four states: > + // - output complete (just happened to align with end of > + // buffer > + // - in middle of EOM bytes > + // - about to write EOM bytes > + // - more data to write > + int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH); > + if (eomBufferWritten < EOM_BUFFER.length) { > + // EOM has just been completed > + compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten); > + compressedPart = new MessagePart(true, > + getRsv(uncompressedPart), opCode, compressedPayload, > + uncompressedIntermediateHandler, uncompressedIntermediateHandler); > + deflaterResetRequired = true; > + deflateRequired = false; > + } else { > + // More data to write > + // Copy bytes to new write buffer > + writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten); > + compressedPart = new MessagePart(false, > + getRsv(uncompressedPart), opCode, compressedPayload, > + uncompressedIntermediateHandler, uncompressedIntermediateHandler); > + } > + } else { > + throw new IllegalStateException("Should never happen"); > + } > + > + // Add the newly created compressed part to the set of parts > + // to pass on to the next transformation. > + compressedParts.add(compressedPart); > + } > + > + SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler(); > + int size = compressedParts.size(); > + if (size > 0) { > + compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler); > + } > + > + allCompressedParts.addAll(compressedParts); > } > } > > if (next == null) { > - return compressedParts; > + return allCompressedParts; > } else { > - return next.sendMessagePart(compressedParts); > + return next.sendMessagePart(allCompressedParts); > } > } > + > + > + private int getRsv(MessagePart uncompressedMessagePart) { > + int result = uncompressedMessagePart.getRsv(); > + if (!firstCompressedFrameWritten) { > + result += RSV_BITMASK; > + firstCompressedFrameWritten = true; > + } > + return result; > + } > } > > Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java > URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) > +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Aug 22 11:02:19 2014 > @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm > > private final StateMachine stateMachine = new StateMachine(); > > + private final IntermediateMessageHandler intermediateMessageHandler = > + new IntermediateMessageHandler(this); > + > private Transformation transformation = null; > private boolean messagePartInProgress = false; > private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); > @@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm > > List<MessagePart> messageParts = new ArrayList<>(); > messageParts.add(new MessagePart(last, 0, opCode, payload, > + intermediateMessageHandler, > new EndMessageHandler(this, handler))); > > messageParts = transformation.sendMessagePart(messageParts); > > + // Some extensions/transformations may buffer messages so it is possible > + // that no message parts will be returned. If this is the case the > + // trigger the suppler SendHandler > + if (messageParts.size() == 0) { > + handler.onResult(new SendResult()); > + return; > + } > + > MessagePart mp = messageParts.remove(0); > > boolean doWrite = false; > @@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm > > wsSession.updateLastActive(); > > - handler.onResult(result); > + // Some handlers, such as the IntermediateMessageHandler, do not have a > + // nested handler so handler may be null. > + if (handler != null) { > + handler.onResult(result); > + } > } > > > void writeMessagePart(MessagePart mp) { > - > if (closed) { > throw new IllegalStateException( > sm.getString("wsRemoteEndpoint.closed")); > @@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm > if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { > nextFragmented = fragmented; > nextText = text; > - doWrite(mp.getHandler(), outputBuffer); > + doWrite(mp.getEndHandler(), outputBuffer); > return; > } > > @@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm > if (getBatchingAllowed() || isMasked()) { > // Need to write via output buffer > OutputBufferSendHandler obsh = new OutputBufferSendHandler( > - mp.getHandler(), headerBuffer, mp.getPayload(), mask, > + mp.getEndHandler(), headerBuffer, mp.getPayload(), mask, > outputBuffer, !getBatchingAllowed(), this); > obsh.write(); > } else { > // Can write directly > - doWrite(mp.getHandler(), headerBuffer, mp.getPayload()); > + doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload()); > } > - > } > > > @@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm > } > > > + /** > + * If a transformation needs to split a {@link MessagePart} into multiple > + * {@link MessagePart}s, it uses this handler as the end handler for each of > + * the additional {@link MessagePart}s. This handler notifies this this > + * class that the {@link MessagePart} has been processed and that the next > + * {@link MessagePart} in the queue should be started. The final > + * {@link MessagePart} will use the {@link EndMessageHandler} provided with > + * the original {@link MessagePart}. > + */ > + private static class IntermediateMessageHandler implements SendHandler { > + > + private final WsRemoteEndpointImplBase endpoint; > + > + public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) { > + this.endpoint = endpoint; > + } > + > + > + @Override > + public void onResult(SendResult result) { > + endpoint.endMessage(null, result); > + } > + } > + > + > public void sendObject(Object obj) throws IOException { > Future<Void> f = sendObjectByFuture(obj); > try { > > Modified: tomcat/trunk/webapps/docs/changelog.xml > URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff > ============================================================================== > --- tomcat/trunk/webapps/docs/changelog.xml (original) > +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014 > @@ -54,6 +54,14 @@ > </fix> > </changelog> > </subsection> > + <subsection name="WebSocket"> > + <changelog> > + <add> > + Extend support for the <code>permessage-deflate</code> extension to > + compression of outgoing messages on the server side. (markt) > + </add> > + </changelog> > + </subsection> > <subsection name="Other"> > <changelog> > <add> > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org > For additional commands, e-mail: dev-h...@tomcat.apache.org >