This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA-1142 in repository https://gitbox.apache.org/repos/asf/mina.git
commit 024f23db166c83dcd359221f6baea2c0d0505db2 Author: Jonathan Valliere <john...@apache.org> AuthorDate: Tue Jun 1 20:39:37 2021 -0400 initial work on parallel codec api --- .../codec/AbstractProtocolDecoderOutput.java | 66 +- .../codec/AbstractProtocolEncoderOutput.java | 101 +-- .../mina/filter/codec/ProtocolCodecFilter.java | 918 +++++++++------------ .../mina/filter/codec/ProtocolCodecSession.java | 15 +- .../mina/filter/codec/ProtocolEncoderOutput.java | 42 +- .../apache/mina/http/HttpServerDecoderTest.java | 472 ++++++----- 6 files changed, 714 insertions(+), 900 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java index 23a54c0..2997e6a 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java @@ -19,41 +19,49 @@ */ package org.apache.mina.filter.codec; -import java.util.LinkedList; +import java.util.ArrayDeque; import java.util.Queue; +import org.apache.mina.core.filterchain.IoFilter.NextFilter; +import org.apache.mina.core.session.IoSession; + /** * A {@link ProtocolDecoderOutput} based on queue. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class AbstractProtocolDecoderOutput implements ProtocolDecoderOutput { - /** The queue where decoded messages are stored */ - private final Queue<Object> messageQueue = new LinkedList<>(); - - /** - * Creates a new instance of a AbstractProtocolDecoderOutput - */ - public AbstractProtocolDecoderOutput() { - // Do nothing - } - - /** - * @return The decoder's message queue - */ - public Queue<Object> getMessageQueue() { - return messageQueue; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(Object message) { - if (message == null) { - throw new IllegalArgumentException("message"); - } - - messageQueue.add(message); - } + /** The queue where decoded messages are stored */ + protected final Queue<Object> messageQueue = new ArrayDeque<>(); + + /** + * Creates a new instance of a AbstractProtocolDecoderOutput + */ + public AbstractProtocolDecoderOutput() { + // Do nothing + } + + /** + * {@inheritDoc} + */ + @Override + public void write(Object message) { + if (message == null) { + throw new IllegalArgumentException("message"); + } + + messageQueue.add(message); + } + + /** + * {@inheritDoc} + */ + @Override + public void flush(NextFilter nextFilter, IoSession session) { + Object message = null; + + while ((message = messageQueue.poll()) != null) { + nextFilter.messageReceived(session, message); + } + } } diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java index e369ba9..58b8852 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java @@ -19,10 +19,8 @@ */ package org.apache.mina.filter.codec; +import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.mina.core.buffer.IoBuffer; /** * A {@link ProtocolEncoderOutput} based on queue. @@ -30,80 +28,25 @@ import org.apache.mina.core.buffer.IoBuffer; * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput { - /** The queue where the decoded messages are stored */ - private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>(); - - private boolean buffersOnly = true; - - /** - * Creates an instance of AbstractProtocolEncoderOutput - */ - public AbstractProtocolEncoderOutput() { - // Do nothing - } - - /** - * @return The message queue - */ - public Queue<Object> getMessageQueue() { - return messageQueue; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(Object encodedMessage) { - if (encodedMessage instanceof IoBuffer) { - IoBuffer buf = (IoBuffer) encodedMessage; - if (buf.hasRemaining()) { - messageQueue.offer(buf); - } else { - throw new IllegalArgumentException("buf is empty. Forgot to call flip()?"); - } - } else { - messageQueue.offer(encodedMessage); - buffersOnly = false; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void mergeAll() { - if (!buffersOnly) { - throw new IllegalStateException("the encoded message list contains a non-buffer."); - } - - final int size = messageQueue.size(); - - if (size < 2) { - // no need to merge! - return; - } - - // Get the size of merged BB - int sum = 0; - for (Object b : messageQueue) { - sum += ((IoBuffer) b).remaining(); - } - - // Allocate a new BB that will contain all fragments - IoBuffer newBuf = IoBuffer.allocate(sum); - - // and merge all. - for (;;) { - IoBuffer buf = (IoBuffer) messageQueue.poll(); - if (buf == null) { - break; - } - - newBuf.put(buf); - } - - // Push the new buffer finally. - newBuf.flip(); - messageQueue.add(newBuf); - } + /** The queue where the decoded messages are stored */ + protected final Queue<Object> messageQueue = new ArrayDeque<>(); + + /** + * Creates an instance of AbstractProtocolEncoderOutput + */ + public AbstractProtocolEncoderOutput() { + // Do nothing + } + + /** + * {@inheritDoc} + */ + @Override + public void write(Object message) { + if (message == null) { + throw new IllegalArgumentException("message"); + } + + messageQueue.offer(message); + } } \ No newline at end of file diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java index a460b3d..93039e8 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java @@ -27,13 +27,10 @@ import org.apache.mina.core.file.FileRegion; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.filterchain.IoFilterAdapter; import org.apache.mina.core.filterchain.IoFilterChain; -import org.apache.mina.core.future.DefaultWriteFuture; import org.apache.mina.core.future.WriteFuture; -import org.apache.mina.core.session.AbstractIoSession; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.DefaultWriteRequest; -import org.apache.mina.core.write.NothingWrittenException; import org.apache.mina.core.write.WriteRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,509 +44,418 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean */ public class ProtocolCodecFilter extends IoFilterAdapter { - /** A logger for this class */ - private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class); - - private static final Class<?>[] EMPTY_PARAMS = new Class[0]; - - private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); - - private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder"); - - private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder"); - - private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut"); - - private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut"); - - /** The factory responsible for creating the encoder and decoder */ - private final ProtocolCodecFactory factory; - - /** - * Creates a new instance of ProtocolCodecFilter, associating a factory - * for the creation of the encoder and decoder. - * - * @param factory The associated factory - */ - public ProtocolCodecFilter(ProtocolCodecFactory factory) { - if (factory == null) { - throw new IllegalArgumentException("factory"); - } - - this.factory = factory; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder). - * - * @param encoder The class responsible for encoding the message - * @param decoder The class responsible for decoding the message - */ - public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { - if (encoder == null) { - throw new IllegalArgumentException("encoder"); - } - if (decoder == null) { - throw new IllegalArgumentException("decoder"); - } - - // Create the inner Factory based on the two parameters - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) { - return decoder; - } - }; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder), which are class names. Instances - * for those classes will be created in this constructor. - * - * @param encoderClass The class responsible for encoding the message - * @param decoderClass The class responsible for decoding the message - */ - public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass, - final Class<? extends ProtocolDecoder> decoderClass) { - if (encoderClass == null) { - throw new IllegalArgumentException("encoderClass"); - } - if (decoderClass == null) { - throw new IllegalArgumentException("decoderClass"); - } - if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { - throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); - } - if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { - throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); - } - try { - encoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); - } - try { - decoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); - } - - final ProtocolEncoder encoder; - - try { - encoder = encoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("encoderClass cannot be initialized"); - } - - final ProtocolDecoder decoder; - - try { - decoder = decoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("decoderClass cannot be initialized"); - } - - // Create the inner factory based on the two parameters. - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) throws Exception { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) throws Exception { - return decoder; - } - }; - } - - /** - * Get the encoder instance from a given session. - * - * @param session The associated session we will get the encoder from - * @return The encoder instance, if any - */ - public ProtocolEncoder getEncoder(IoSession session) { - return (ProtocolEncoder) session.getAttribute(ENCODER); - } - - /** - * {@inheritDoc} - */ - @Override - public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - if (parent.contains(this)) { - throw new IllegalArgumentException( - "You can't add the same filter instance more than once. Create another instance and add it."); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - // Clean everything - disposeCodec(parent.getSession()); - } - - /** - * Process the incoming message, calling the session decoder. As the incoming - * buffer might contains more than one messages, we have to loop until the decoder - * throws an exception. - * - * while ( buffer not empty ) - * try - * decode ( buffer ) - * catch - * break; - * - */ - @Override - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId()); - } - - if (!(message instanceof IoBuffer)) { - nextFilter.messageReceived(session, message); - return; - } - - IoBuffer in = (IoBuffer) message; - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); - - // Loop until we don't have anymore byte in the buffer, - // or until the decoder throws an unrecoverable exception or - // can't decoder a message, because there are not enough - // data in the buffer - while (in.hasRemaining()) { - int oldPos = in.position(); - try { - synchronized (session) { - // Call the decoder with the read bytes - decoder.decode(session, in, decoderOut); - } - // Finish decoding if no exception was thrown. - decoderOut.flush(nextFilter, session); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - if (pde.getHexdump() == null) { - // Generate a message hex dump - int curPos = in.position(); - in.position(oldPos); - pde.setHexdump(in.getHexDump()); - in.position(curPos); - } - // Fire the exceptionCaught event. - decoderOut.flush(nextFilter, session); - nextFilter.exceptionCaught(session, pde); - // Retry only if the type of the caught exception is - // recoverable and the buffer position has changed. - // We check buffer position additionally to prevent an - // infinite loop. - if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { - break; - } - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - if (writeRequest instanceof EncodedWriteRequest) { - return; - } - - nextFilter.messageSent(session, writeRequest); - } - - /** - * {@inheritDoc} - */ - @Override - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - Object message = writeRequest.getMessage(); - - // Bypass the encoding if the message is contained in a IoBuffer, - // as it has already been encoded before - if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { - nextFilter.filterWrite(session, writeRequest); - return; - } - - // Get the encoder in the session - ProtocolEncoder encoder = factory.getEncoder(session); - - ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); - - if (encoder == null) { - throw new ProtocolEncoderException("The encoder is null for the session " + session); - } - - try { - // Now we can try to encode the response - encoder.encode(session, message, encoderOut); - - // Send it directly - Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue(); - - // Write all the encoded messages now - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - if (bufferQueue.isEmpty()) { - writeRequest.setMessage(encodedMessage); + /** A logger for this class */ + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class); + + private static final Class<?>[] EMPTY_PARAMS = new Class[0]; + + private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); + + private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder"); + + private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder"); + + private static final ProtocolDecoderOutputLocal DECODER_OUTPUT = new ProtocolDecoderOutputLocal(); + + private static final ProtocolEncoderOutputLocal ENCODER_OUTPUT = new ProtocolEncoderOutputLocal(); + + /** The factory responsible for creating the encoder and decoder */ + private final ProtocolCodecFactory factory; + + /** + * Creates a new instance of ProtocolCodecFilter, associating a factory for the + * creation of the encoder and decoder. + * + * @param factory The associated factory + */ + public ProtocolCodecFilter(ProtocolCodecFactory factory) { + if (factory == null) { + throw new IllegalArgumentException("factory"); + } + + this.factory = factory; + } + + /** + * Creates a new instance of ProtocolCodecFilter, without any factory. The + * encoder/decoder factory will be created as an inner class, using the two + * parameters (encoder and decoder). + * + * @param encoder The class responsible for encoding the message + * @param decoder The class responsible for decoding the message + */ + public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { + if (encoder == null) { + throw new IllegalArgumentException("encoder"); + } + if (decoder == null) { + throw new IllegalArgumentException("decoder"); + } + + // Create the inner Factory based on the two parameters + this.factory = new ProtocolCodecFactory() { + /** + * {@inheritDoc} + */ + @Override + public ProtocolEncoder getEncoder(IoSession session) { + return encoder; + } + + /** + * {@inheritDoc} + */ + @Override + public ProtocolDecoder getDecoder(IoSession session) { + return decoder; + } + }; + } + + /** + * Creates a new instance of ProtocolCodecFilter, without any factory. The + * encoder/decoder factory will be created as an inner class, using the two + * parameters (encoder and decoder), which are class names. Instances for those + * classes will be created in this constructor. + * + * @param encoderClass The class responsible for encoding the message + * @param decoderClass The class responsible for decoding the message + */ + public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass, + final Class<? extends ProtocolDecoder> decoderClass) { + if (encoderClass == null) { + throw new IllegalArgumentException("encoderClass"); + } + if (decoderClass == null) { + throw new IllegalArgumentException("decoderClass"); + } + if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { + throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); + } + if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { + throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); + } + try { + encoderClass.getConstructor(EMPTY_PARAMS); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); + } + try { + decoderClass.getConstructor(EMPTY_PARAMS); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); + } + + final ProtocolEncoder encoder; + + try { + encoder = encoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("encoderClass cannot be initialized"); + } + + final ProtocolDecoder decoder; + + try { + decoder = decoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("decoderClass cannot be initialized"); + } + + // Create the inner factory based on the two parameters. + this.factory = new ProtocolCodecFactory() { + /** + * {@inheritDoc} + */ + @Override + public ProtocolEncoder getEncoder(IoSession session) throws Exception { + return encoder; + } + + /** + * {@inheritDoc} + */ + @Override + public ProtocolDecoder getDecoder(IoSession session) throws Exception { + return decoder; + } + }; + } + + /** + * Get the encoder instance from a given session. + * + * @param session The associated session we will get the encoder from + * @return The encoder instance, if any + */ + public ProtocolEncoder getEncoder(IoSession session) { + return (ProtocolEncoder) session.getAttribute(ENCODER); + } + + /** + * {@inheritDoc} + */ + @Override + public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { + if (parent.contains(this)) { + throw new IllegalArgumentException( + "You can't add the same filter instance more than once. Create another instance and add it."); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { + // Clean everything + disposeCodec(parent.getSession()); + } + + /** + * Process the incoming message, calling the session decoder. As the incoming + * buffer might contains more than one messages, we have to loop until the + * decoder throws an exception. + * + * while ( buffer not empty ) try decode ( buffer ) catch break; + * + */ + @Override + public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) + throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId()); + } + + if (!(message instanceof IoBuffer)) { + nextFilter.messageReceived(session, message); + return; + } + + final IoBuffer in = (IoBuffer) message; + final ProtocolDecoder decoder = factory.getDecoder(session); + final ProtocolDecoderOutputImpl decoderOut = DECODER_OUTPUT.get(); + + // Loop until we don't have anymore byte in the buffer, + // or until the decoder throws an unrecoverable exception or + // can't decoder a message, because there are not enough + // data in the buffer + while (in.hasRemaining()) { + int oldPos = in.position(); + try { + // Call the decoder with the read bytes + decoder.decode(session, in, decoderOut); + // Finish decoding if no exception was thrown. + decoderOut.flush(nextFilter, session); + } catch (Exception e) { + ProtocolDecoderException pde; + if (e instanceof ProtocolDecoderException) { + pde = (ProtocolDecoderException) e; + } else { + pde = new ProtocolDecoderException(e); + } + if (pde.getHexdump() == null) { + // Generate a message hex dump + int curPos = in.position(); + in.position(oldPos); + pde.setHexdump(in.getHexDump()); + in.position(curPos); + } + // Fire the exceptionCaught event. + decoderOut.flush(nextFilter, session); + nextFilter.exceptionCaught(session, pde); + // Retry only if the type of the caught exception is + // recoverable and the buffer position has changed. + // We check buffer position additionally to prevent an + // infinite loop. + if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { + break; + } + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { + if (writeRequest instanceof EncodedWriteRequest) { + return; + } + + nextFilter.messageSent(session, writeRequest); + } + + /** + * {@inheritDoc} + */ + @Override + public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) + throws Exception { + final Object message = writeRequest.getMessage(); + + // Bypass the encoding if the message is contained in a IoBuffer, + // as it has already been encoded before + if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { nextFilter.filterWrite(session, writeRequest); - } else { - SocketAddress destination = writeRequest.getDestination(); - WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); - nextFilter.filterWrite(session, encodedWriteRequest); - } + return; + } + + // Get the encoder in the session + final ProtocolEncoder encoder = factory.getEncoder(session); + final ProtocolEncoderOutputImpl encoderOut = ENCODER_OUTPUT.get(); + + if (encoder == null) { + throw new ProtocolEncoderException("The encoder is null for the session " + session); + } + + try { + // Now we can try to encode the response + encoder.encode(session, message, encoderOut); + + final Queue<Object> queue = encoderOut.messageQueue; + + if (queue.isEmpty()) { + // Write empty message to ensure that messageSent is fired later + writeRequest.setMessage(EMPTY_BUFFER); + nextFilter.filterWrite(session, writeRequest); + } else { + // Write all the encoded messages now + Object encodedMessage = null; + + while ((encodedMessage = queue.poll()) != null) { + if (queue.isEmpty()) { + // Write last message using original WriteRequest to ensure that any Future and + // dependency on messageSent event is emitted correctly + writeRequest.setMessage(encodedMessage); + nextFilter.filterWrite(session, writeRequest); + } else { + SocketAddress destination = writeRequest.getDestination(); + WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); + nextFilter.filterWrite(session, encodedWriteRequest); + } + } + } + } catch (final ProtocolEncoderException e) { + throw e; + } catch (final Exception e) { + // Generate the correct exception + throw new ProtocolEncoderException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = factory.getDecoder(session); + ProtocolDecoderOutput decoderOut = DECODER_OUTPUT.get(); + + try { + decoder.finishDecode(session, decoderOut); + } catch (Exception e) { + ProtocolDecoderException pde; + if (e instanceof ProtocolDecoderException) { + pde = (ProtocolDecoderException) e; + } else { + pde = new ProtocolDecoderException(e); + } + throw pde; + } finally { + // Dispose everything + disposeCodec(session); + decoderOut.flush(nextFilter, session); + } + + // Call the next filter + nextFilter.sessionClosed(session); + } + + private static class EncodedWriteRequest extends DefaultWriteRequest { + public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { + super(encodedMessage, future, destination); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isEncoded() { + return true; + } + } + + private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { + public ProtocolDecoderOutputImpl() { + // Do nothing + } + } + + private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { + public ProtocolEncoderOutputImpl() { + // Do nothing + } + } + + // ----------- Helper methods --------------------------------------------- + /** + * Dispose the encoder, decoder, and the callback for the decoded messages. + */ + private void disposeCodec(IoSession session) { + // We just remove the two instances of encoder/decoder to release resources + // from the session + disposeEncoder(session); + disposeDecoder(session); + } + + /** + * Dispose the encoder, removing its instance from the session's attributes, and + * calling the associated dispose method. + */ + private void disposeEncoder(IoSession session) { + ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); + if (encoder == null) { + return; + } + + try { + encoder.dispose(session); + } catch (Exception e) { + LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); + } + } + + /** + * Dispose the decoder, removing its instance from the session's attributes, and + * calling the associated dispose method. + */ + private void disposeDecoder(IoSession session) { + ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); + if (decoder == null) { + return; + } + + try { + decoder.dispose(session); + } catch (Exception e) { + LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); + } + } + + static private class ProtocolDecoderOutputLocal extends ThreadLocal<ProtocolDecoderOutputImpl> { + @Override + protected ProtocolDecoderOutputImpl initialValue() { + return new ProtocolDecoderOutputImpl(); + } + } + + static private class ProtocolEncoderOutputLocal extends ThreadLocal<ProtocolEncoderOutputImpl> { + @Override + protected ProtocolEncoderOutputImpl initialValue() { + return new ProtocolEncoderOutputImpl(); } - } - } catch (Exception e) { - ProtocolEncoderException pee; - - // Generate the correct exception - if (e instanceof ProtocolEncoderException) { - pee = (ProtocolEncoderException) e; - } else { - pee = new ProtocolEncoderException(e); - } - - throw pee; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { - // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); - - try { - decoder.finishDecode(session, decoderOut); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - throw pde; - } finally { - // Dispose everything - disposeCodec(session); - decoderOut.flush(nextFilter, session); - } - - // Call the next filter - nextFilter.sessionClosed(session); - } - - private static class EncodedWriteRequest extends DefaultWriteRequest { - public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { - super(encodedMessage, future, destination); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isEncoded() { - return true; - } - } - - private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { - public ProtocolDecoderOutputImpl() { - // Do nothing - } - - /** - * {@inheritDoc} - */ - @Override - public void flush(NextFilter nextFilter, IoSession session) { - Queue<Object> messageQueue = getMessageQueue(); - - while (!messageQueue.isEmpty()) { - nextFilter.messageReceived(session, messageQueue.poll()); - } - } - } - - private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { - private final IoSession session; - - private final NextFilter nextFilter; - - /** The WriteRequest destination */ - private final SocketAddress destination; - - public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - this.session = session; - this.nextFilter = nextFilter; - - // Only store the destination, not the full WriteRequest. - destination = writeRequest.getDestination(); - } - - /** - * {@inheritDoc} - */ - @Override - public WriteFuture flush() { - Queue<Object> bufferQueue = getMessageQueue(); - WriteFuture future = null; - - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - future = new DefaultWriteFuture(session); - nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); - } - } - - if (future == null) { - // Creates an empty writeRequest containing the destination - future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST)); - } - - return future; - } - } - - //----------- Helper methods --------------------------------------------- - /** - * Dispose the encoder, decoder, and the callback for the decoded - * messages. - */ - private void disposeCodec(IoSession session) { - // We just remove the two instances of encoder/decoder to release resources - // from the session - disposeEncoder(session); - disposeDecoder(session); - - // We also remove the callback - disposeDecoderOut(session); - } - - /** - * Dispose the encoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeEncoder(IoSession session) { - ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); - if (encoder == null) { - return; - } - - try { - encoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); - } - } - - /** - * Dispose the decoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeDecoder(IoSession session) { - ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); - if (decoder == null) { - return; - } - - try { - decoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); - } - } - - /** - * Return a reference to the decoder callback. If it's not already created - * and stored into the session, we create a new instance. - */ - private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { - ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolDecoderOutputImpl(); - session.setAttribute(DECODER_OUT, out); - } - - return out; - } - - private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); - session.setAttribute(ENCODER_OUT, out); - } - - return out; - } - - /** - * Remove the decoder callback from the session's attributes. - */ - private void disposeDecoderOut(IoSession session) { - session.removeAttribute(DECODER_OUT); - } + } } diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java index 2b5f89c..1638491 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java @@ -59,17 +59,8 @@ import org.apache.mina.core.session.IoSession; */ public class ProtocolCodecSession extends DummySession { - private final WriteFuture notWrittenFuture = DefaultWriteFuture.newNotWrittenFuture(this, - new UnsupportedOperationException()); - private final AbstractProtocolEncoderOutput encoderOutput = new AbstractProtocolEncoderOutput() { - /** - * {@inheritDoc} - */ - @Override - public WriteFuture flush() { - return notWrittenFuture; - } + }; private final AbstractProtocolDecoderOutput decoderOutput = new AbstractProtocolDecoderOutput() { @@ -101,7 +92,7 @@ public class ProtocolCodecSession extends DummySession { * @return the {@link Queue} of the buffered encoder output. */ public Queue<Object> getEncoderOutputQueue() { - return encoderOutput.getMessageQueue(); + return encoderOutput.messageQueue; } /** @@ -116,6 +107,6 @@ public class ProtocolCodecSession extends DummySession { * @return the {@link Queue} of the buffered decoder output. */ public Queue<Object> getDecoderOutputQueue() { - return decoderOutput.getMessageQueue(); + return decoderOutput.messageQueue; } } diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java index 0fc847c..508ee23 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java @@ -21,44 +21,22 @@ package org.apache.mina.filter.codec; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.file.FileRegion; -import org.apache.mina.core.future.WriteFuture; /** * Callback for {@link ProtocolEncoder} to generate encoded messages such as - * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)} + * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)} * for each encoded message. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public interface ProtocolEncoderOutput { - /** - * Callback for {@link ProtocolEncoder} to generate an encoded message such - * as an {@link IoBuffer}. {@link ProtocolEncoder} must call - * {@link #write(Object)} for each encoded message. - * - * @param encodedMessage the encoded message, typically an {@link IoBuffer} - * or a {@link FileRegion}. - */ - void write(Object encodedMessage); - - /** - * Merges all buffers you wrote via {@link #write(Object)} into - * one {@link IoBuffer} and replaces the old fragmented ones with it. - * This method is useful when you want to control the way MINA generates - * network packets. Please note that this method only works when you - * called {@link #write(Object)} method with only {@link IoBuffer}s. - * - * @throws IllegalStateException if you wrote something else than {@link IoBuffer} - */ - void mergeAll(); - - /** - * Flushes all buffers you wrote via {@link #write(Object)} to - * the session. This operation is asynchronous; please wait for - * the returned {@link WriteFuture} if you want to wait for - * the buffers flushed. - * - * @return <tt>null</tt> if there is nothing to flush at all. - */ - WriteFuture flush(); + /** + * Callback for {@link ProtocolEncoder} to generate an encoded message such as + * an {@link IoBuffer}. {@link ProtocolEncoder} must call {@link #write(Object)} + * for each encoded message. + * + * @param message the encoded message, typically an {@link IoBuffer} or a + * {@link FileRegion}. + */ + void write(Object message); } \ No newline at end of file diff --git a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java index 87b886d..f8497b8 100644 --- a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java +++ b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java @@ -25,9 +25,9 @@ import static org.junit.Assert.assertTrue; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; +import java.util.Queue; import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.filterchain.IoFilter.NextFilter; import org.apache.mina.core.session.DummySession; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput; @@ -37,262 +37,250 @@ import org.apache.mina.http.api.HttpRequest; import org.junit.Test; public class HttpServerDecoderTest { - private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$ + private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$ - private static final ProtocolDecoder decoder = new HttpServerDecoder(); + private static final ProtocolDecoder decoder = new HttpServerDecoder(); - /* - * Use a single session for all requests in order to test state management better - */ - private static IoSession session = new DummySession(); + /* + * Use a single session for all requests in order to test state management + * better + */ + private static IoSession session = new DummySession(); - /** - * Build an IO buffer containing a simple minimal HTTP request. - * - * @param method the HTTP method - * @param body the option body - * @return the built IO buffer - * @throws CharacterCodingException if encoding fails - */ - protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException { - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder); - - if (body != null) { - buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder); - buffer.putString(body, encoder); - } else { - buffer.putString("\r\n", encoder); - } - - buffer.rewind(); - - return buffer; - } + /** + * Build an IO buffer containing a simple minimal HTTP request. + * + * @param method the HTTP method + * @param body the option body + * @return the built IO buffer + * @throws CharacterCodingException if encoding fails + */ + protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException { + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder); - protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException { - return getRequestBuffer(method, null); - } + if (body != null) { + buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder); + buffer.putString(body, encoder); + } else { + buffer.putString("\r\n", encoder); + } - /** - * Execute an HTPP request and return the queue of messages. - * - * @param method the HTTP method - * @param body the optional body - * @return the protocol output and its queue of messages - * @throws Exception if error occurs (encoding,...) - */ - protected static AbstractProtocolDecoderOutput executeRequest(String method, String body) throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; + buffer.rewind(); - IoBuffer buffer = getRequestBuffer(method, body); //$NON-NLS-1$ - - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - - return out; - } + return buffer; + } - @Test - public void testGetRequestWithoutBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("GET", null); - assertEquals(2, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException { + return getRequestBuffer(method, null); + } - @Test - public void testGetRequestBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("GET", "body"); - assertEquals(3, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + protected static class ProtocolDecoderQueue extends AbstractProtocolDecoderOutput { + public Queue<Object> getQueue() { + return this.messageQueue; + } + } - @Test - public void testPutRequestWithoutBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("PUT", null); - assertEquals(2, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + /** + * Execute an HTPP request and return the queue of messages. + * + * @param method the HTTP method + * @param body the optional body + * @return the protocol output and its queue of messages + * @throws Exception if error occurs (encoding,...) + */ + protected static ProtocolDecoderQueue executeRequest(String method, String body) throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); - @Test - public void testPutRequestBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("PUT", "body"); - assertEquals(3, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + IoBuffer buffer = getRequestBuffer(method, body); // $NON-NLS-1$ - @Test - public void testPostRequestWithoutBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("POST", null); - assertEquals(2, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } - @Test - public void testPostRequestBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("POST", "body"); - assertEquals(3, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + return out; + } - @Test - public void testDeleteRequestWithoutBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("DELETE", null); - assertEquals(2, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + @Test + public void testGetRequestWithoutBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("GET", null); + assertEquals(2, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } - @Test - public void testDeleteRequestBody() throws Exception { - AbstractProtocolDecoderOutput out = executeRequest("DELETE", "body"); - assertEquals(3, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } - - @Test - public void testDIRMINA965NoContent() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("dummy\r\n\r\n", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(2, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + @Test + public void testGetRequestBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("GET", "body"); + assertEquals(3, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } - @Test - public void testDIRMINA965WithContent() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(3, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } - @Test - public void testDIRMINA965WithContentOnTwoChunks() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("B", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(4, out.getMessageQueue().size()); - assertTrue(out.getMessageQueue().poll() instanceof HttpRequest); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof IoBuffer); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } - - @Test - public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(2, out.getMessageQueue().size()); - HttpRequest request = (HttpRequest) out.getMessageQueue().poll(); - assertEquals("localhost", request.getHeader("host")); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + @Test + public void testPutRequestWithoutBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("PUT", null); + assertEquals(2, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } - @Test - public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.0\r\nHost: localhost\r\n\r\n", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(2, out.getMessageQueue().size()); - HttpRequest request = (HttpRequest) out.getMessageQueue().poll(); - assertEquals("localhost", request.getHeader("host")); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + @Test + public void testPutRequestBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("PUT", "body"); + assertEquals(3, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } - @Test - public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception { - AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() { - public void flush(NextFilter nextFilter, IoSession session) { - } - }; - IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); - buffer.putString("GET / HTTP/1.0\r\nHost:localhost \r\n\r\n", encoder); - buffer.rewind(); - while (buffer.hasRemaining()) { - decoder.decode(session, buffer, out); - } - assertEquals(2, out.getMessageQueue().size()); - HttpRequest request = (HttpRequest) out.getMessageQueue().poll(); - assertEquals("localhost", request.getHeader("host")); - assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent); - } + @Test + public void testPostRequestWithoutBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("POST", null); + assertEquals(2, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testPostRequestBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("POST", "body"); + assertEquals(3, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testDeleteRequestWithoutBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("DELETE", null); + assertEquals(2, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testDeleteRequestBody() throws Exception { + ProtocolDecoderQueue out = executeRequest("DELETE", "body"); + assertEquals(3, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testDIRMINA965NoContent() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("dummy\r\n\r\n", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + assertEquals(2, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testDIRMINA965WithContent() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + + assertEquals(3, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void testDIRMINA965WithContentOnTwoChunks() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("B", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + assertEquals(4, out.getQueue().size()); + assertTrue(out.getQueue().poll() instanceof HttpRequest); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof IoBuffer); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + assertEquals(2, out.getQueue().size()); + HttpRequest request = (HttpRequest) out.getQueue().poll(); + assertEquals("localhost", request.getHeader("host")); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.0\r\nHost: localhost\r\n\r\n", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + assertEquals(2, out.getQueue().size()); + HttpRequest request = (HttpRequest) out.getQueue().poll(); + assertEquals("localhost", request.getHeader("host")); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } + + @Test + public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception { + ProtocolDecoderQueue out = new ProtocolDecoderQueue(); + IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); + buffer.putString("GET / HTTP/1.0\r\nHost:localhost \r\n\r\n", encoder); + buffer.rewind(); + while (buffer.hasRemaining()) { + decoder.decode(session, buffer, out); + } + assertEquals(2, out.getQueue().size()); + HttpRequest request = (HttpRequest) out.getQueue().poll(); + assertEquals("localhost", request.getHeader("host")); + assertTrue(out.getQueue().poll() instanceof HttpEndOfContent); + } }