This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1142
in repository https://gitbox.apache.org/repos/asf/mina.git

commit 024f23db166c83dcd359221f6baea2c0d0505db2
Author: Jonathan Valliere <john...@apache.org>
AuthorDate: Tue Jun 1 20:39:37 2021 -0400

    initial work on parallel codec api
---
 .../codec/AbstractProtocolDecoderOutput.java       |  66 +-
 .../codec/AbstractProtocolEncoderOutput.java       | 101 +--
 .../mina/filter/codec/ProtocolCodecFilter.java     | 918 +++++++++------------
 .../mina/filter/codec/ProtocolCodecSession.java    |  15 +-
 .../mina/filter/codec/ProtocolEncoderOutput.java   |  42 +-
 .../apache/mina/http/HttpServerDecoderTest.java    | 472 ++++++-----
 6 files changed, 714 insertions(+), 900 deletions(-)

diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
 
b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
index 23a54c0..2997e6a 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
@@ -19,41 +19,49 @@
  */
 package org.apache.mina.filter.codec;
 
-import java.util.LinkedList;
+import java.util.ArrayDeque;
 import java.util.Queue;
 
+import org.apache.mina.core.filterchain.IoFilter.NextFilter;
+import org.apache.mina.core.session.IoSession;
+
 /**
  * A {@link ProtocolDecoderOutput} based on queue.
  *
  * @author <a href="http://mina.apache.org";>Apache MINA Project</a>
  */
 public abstract class AbstractProtocolDecoderOutput implements 
ProtocolDecoderOutput {
-    /** The queue where decoded messages are stored */
-    private final Queue<Object> messageQueue = new LinkedList<>();
-
-    /**
-     * Creates a new instance of a AbstractProtocolDecoderOutput
-     */
-    public AbstractProtocolDecoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The decoder's message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object message) {
-        if (message == null) {
-            throw new IllegalArgumentException("message");
-        }
-
-        messageQueue.add(message);
-    }
+       /** The queue where decoded messages are stored */
+       protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+       /**
+        * Creates a new instance of a AbstractProtocolDecoderOutput
+        */
+       public AbstractProtocolDecoderOutput() {
+               // Do nothing
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void write(Object message) {
+               if (message == null) {
+                       throw new IllegalArgumentException("message");
+               }
+
+               messageQueue.add(message);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void flush(NextFilter nextFilter, IoSession session) {
+               Object message = null;
+
+               while ((message = messageQueue.poll()) != null) {
+                       nextFilter.messageReceived(session, message);
+               }
+       }
 }
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
 
b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
index e369ba9..58b8852 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
@@ -19,10 +19,8 @@
  */
 package org.apache.mina.filter.codec;
 
+import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.mina.core.buffer.IoBuffer;
 
 /**
  * A {@link ProtocolEncoderOutput} based on queue.
@@ -30,80 +28,25 @@ import org.apache.mina.core.buffer.IoBuffer;
  * @author <a href="http://mina.apache.org";>Apache MINA Project</a>
  */
 public abstract class AbstractProtocolEncoderOutput implements 
ProtocolEncoderOutput {
-    /** The queue where the decoded messages are stored */
-    private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
-
-    private boolean buffersOnly = true;
-
-    /**
-     * Creates an instance of AbstractProtocolEncoderOutput
-     */
-    public AbstractProtocolEncoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object encodedMessage) {
-        if (encodedMessage instanceof IoBuffer) {
-            IoBuffer buf = (IoBuffer) encodedMessage;
-            if (buf.hasRemaining()) {
-                messageQueue.offer(buf);
-            } else {
-                throw new IllegalArgumentException("buf is empty. Forgot to 
call flip()?");
-            }
-        } else {
-            messageQueue.offer(encodedMessage);
-            buffersOnly = false;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void mergeAll() {
-        if (!buffersOnly) {
-            throw new IllegalStateException("the encoded message list contains 
a non-buffer.");
-        }
-
-        final int size = messageQueue.size();
-
-        if (size < 2) {
-            // no need to merge!
-            return;
-        }
-
-        // Get the size of merged BB
-        int sum = 0;
-        for (Object b : messageQueue) {
-            sum += ((IoBuffer) b).remaining();
-        }
-
-        // Allocate a new BB that will contain all fragments
-        IoBuffer newBuf = IoBuffer.allocate(sum);
-
-        // and merge all.
-        for (;;) {
-            IoBuffer buf = (IoBuffer) messageQueue.poll();
-            if (buf == null) {
-                break;
-            }
-
-            newBuf.put(buf);
-        }
-
-        // Push the new buffer finally.
-        newBuf.flip();
-        messageQueue.add(newBuf);
-    }
+       /** The queue where the decoded messages are stored */
+       protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+       /**
+        * Creates an instance of AbstractProtocolEncoderOutput
+        */
+       public AbstractProtocolEncoderOutput() {
+               // Do nothing
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void write(Object message) {
+               if (message == null) {
+                       throw new IllegalArgumentException("message");
+               }
+
+               messageQueue.offer(message);
+       }
 }
\ No newline at end of file
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
index a460b3d..93039e8 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
@@ -27,13 +27,10 @@ import org.apache.mina.core.file.FileRegion;
 import org.apache.mina.core.filterchain.IoFilter;
 import org.apache.mina.core.filterchain.IoFilterAdapter;
 import org.apache.mina.core.filterchain.IoFilterChain;
-import org.apache.mina.core.future.DefaultWriteFuture;
 import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.AbstractIoSession;
 import org.apache.mina.core.session.AttributeKey;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.NothingWrittenException;
 import org.apache.mina.core.write.WriteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,509 +44,418 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean
  */
 public class ProtocolCodecFilter extends IoFilterAdapter {
-    /** A logger for this class */
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolCodecFilter.class);
-
-    private static final Class<?>[] EMPTY_PARAMS = new Class[0];
-
-    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
-
-    private static final AttributeKey ENCODER = new 
AttributeKey(ProtocolCodecFilter.class, "encoder");
-
-    private static final AttributeKey DECODER = new 
AttributeKey(ProtocolCodecFilter.class, "decoder");
-
-    private static final AttributeKey DECODER_OUT = new 
AttributeKey(ProtocolCodecFilter.class, "decoderOut");
-
-    private static final AttributeKey ENCODER_OUT = new 
AttributeKey(ProtocolCodecFilter.class, "encoderOut");
-
-    /** The factory responsible for creating the encoder and decoder */
-    private final ProtocolCodecFactory factory;
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, associating a factory
-     * for the creation of the encoder and decoder.
-     *
-     * @param factory The associated factory
-     */
-    public ProtocolCodecFilter(ProtocolCodecFactory factory) {
-        if (factory == null) {
-            throw new IllegalArgumentException("factory");
-        }
-
-        this.factory = factory;
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder).
-     * 
-     * @param encoder The class responsible for encoding the message
-     * @param decoder The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final ProtocolEncoder encoder, final 
ProtocolDecoder decoder) {
-        if (encoder == null) {
-            throw new IllegalArgumentException("encoder");
-        }
-        if (decoder == null) {
-            throw new IllegalArgumentException("decoder");
-        }
-
-        // Create the inner Factory based on the two parameters
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder), which are class names. 
Instances
-     * for those classes will be created in this constructor.
-     * 
-     * @param encoderClass The class responsible for encoding the message
-     * @param decoderClass The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> 
encoderClass,
-            final Class<? extends ProtocolDecoder> decoderClass) {
-        if (encoderClass == null) {
-            throw new IllegalArgumentException("encoderClass");
-        }
-        if (decoderClass == null) {
-            throw new IllegalArgumentException("decoderClass");
-        }
-        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
-            throw new IllegalArgumentException("encoderClass: " + 
encoderClass.getName());
-        }
-        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
-            throw new IllegalArgumentException("decoderClass: " + 
decoderClass.getName());
-        }
-        try {
-            encoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("encoderClass doesn't have a 
public default constructor.");
-        }
-        try {
-            decoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("decoderClass doesn't have a 
public default constructor.");
-        }
-
-        final ProtocolEncoder encoder;
-
-        try {
-            encoder = encoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("encoderClass cannot be 
initialized");
-        }
-
-        final ProtocolDecoder decoder;
-
-        try {
-            decoder = decoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("decoderClass cannot be 
initialized");
-        }
-
-        // Create the inner factory based on the two parameters.
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) throws 
Exception {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) throws 
Exception {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Get the encoder instance from a given session.
-     *
-     * @param session The associated session we will get the encoder from
-     * @return The encoder instance, if any
-     */
-    public ProtocolEncoder getEncoder(IoSession session) {
-        return (ProtocolEncoder) session.getAttribute(ENCODER);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPreAdd(IoFilterChain parent, String name, NextFilter 
nextFilter) throws Exception {
-        if (parent.contains(this)) {
-            throw new IllegalArgumentException(
-                    "You can't add the same filter instance more than once.  
Create another instance and add it.");
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPostRemove(IoFilterChain parent, String name, NextFilter 
nextFilter) throws Exception {
-        // Clean everything
-        disposeCodec(parent.getSession());
-    }
-
-    /**
-     * Process the incoming message, calling the session decoder. As the 
incoming
-     * buffer might contains more than one messages, we have to loop until the 
decoder
-     * throws an exception.
-     * 
-     *  while ( buffer not empty )
-     *    try
-     *      decode ( buffer )
-     *    catch
-     *      break;
-     * 
-     */
-    @Override
-    public void messageReceived(NextFilter nextFilter, IoSession session, 
Object message) throws Exception {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", 
session.getId());
-        }
-
-        if (!(message instanceof IoBuffer)) {
-            nextFilter.messageReceived(session, message);
-            return;
-        }
-
-        IoBuffer in = (IoBuffer) message;
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        // Loop until we don't have anymore byte in the buffer,
-        // or until the decoder throws an unrecoverable exception or
-        // can't decoder a message, because there are not enough
-        // data in the buffer
-        while (in.hasRemaining()) {
-            int oldPos = in.position();
-            try {
-                synchronized (session) {
-                    // Call the decoder with the read bytes
-                    decoder.decode(session, in, decoderOut);
-                }
-                // Finish decoding if no exception was thrown.
-                decoderOut.flush(nextFilter, session);
-            } catch (Exception e) {
-                ProtocolDecoderException pde;
-                if (e instanceof ProtocolDecoderException) {
-                    pde = (ProtocolDecoderException) e;
-                } else {
-                    pde = new ProtocolDecoderException(e);
-                }
-                if (pde.getHexdump() == null) {
-                    // Generate a message hex dump
-                    int curPos = in.position();
-                    in.position(oldPos);
-                    pde.setHexdump(in.getHexDump());
-                    in.position(curPos);
-                }
-                // Fire the exceptionCaught event.
-                decoderOut.flush(nextFilter, session);
-                nextFilter.exceptionCaught(session, pde);
-                // Retry only if the type of the caught exception is
-                // recoverable and the buffer position has changed.
-                // We check buffer position additionally to prevent an
-                // infinite loop.
-                if (!(e instanceof RecoverableProtocolDecoderException) || 
(in.position() == oldPos)) {
-                    break;
-                }
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void messageSent(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
-        if (writeRequest instanceof EncodedWriteRequest) {
-            return;
-        }
-
-        nextFilter.messageSent(session, writeRequest);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void filterWrite(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
-        Object message = writeRequest.getMessage();
-
-        // Bypass the encoding if the message is contained in a IoBuffer,
-        // as it has already been encoded before
-        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
-            nextFilter.filterWrite(session, writeRequest);
-            return;
-        }
-
-        // Get the encoder in the session
-        ProtocolEncoder encoder = factory.getEncoder(session);
-
-        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, 
writeRequest);
-
-        if (encoder == null) {
-            throw new ProtocolEncoderException("The encoder is null for the 
session " + session);
-        }
-
-        try {
-            // Now we can try to encode the response
-            encoder.encode(session, message, encoderOut);
-
-            // Send it directly
-            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) 
encoderOut).getMessageQueue();
-
-            // Write all the encoded messages now
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-               // Flush only when the buffer has remaining.
-               if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
encodedMessage).hasRemaining()) {
-                   if (bufferQueue.isEmpty()) {
-                       writeRequest.setMessage(encodedMessage);
+       /** A logger for this class */
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolCodecFilter.class);
+
+       private static final Class<?>[] EMPTY_PARAMS = new Class[0];
+
+       private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
+
+       private static final AttributeKey ENCODER = new 
AttributeKey(ProtocolCodecFilter.class, "encoder");
+
+       private static final AttributeKey DECODER = new 
AttributeKey(ProtocolCodecFilter.class, "decoder");
+
+       private static final ProtocolDecoderOutputLocal DECODER_OUTPUT = new 
ProtocolDecoderOutputLocal();
+
+       private static final ProtocolEncoderOutputLocal ENCODER_OUTPUT = new 
ProtocolEncoderOutputLocal();
+
+       /** The factory responsible for creating the encoder and decoder */
+       private final ProtocolCodecFactory factory;
+
+       /**
+        * Creates a new instance of ProtocolCodecFilter, associating a factory 
for the
+        * creation of the encoder and decoder.
+        *
+        * @param factory The associated factory
+        */
+       public ProtocolCodecFilter(ProtocolCodecFactory factory) {
+               if (factory == null) {
+                       throw new IllegalArgumentException("factory");
+               }
+
+               this.factory = factory;
+       }
+
+       /**
+        * Creates a new instance of ProtocolCodecFilter, without any factory. 
The
+        * encoder/decoder factory will be created as an inner class, using the 
two
+        * parameters (encoder and decoder).
+        * 
+        * @param encoder The class responsible for encoding the message
+        * @param decoder The class responsible for decoding the message
+        */
+       public ProtocolCodecFilter(final ProtocolEncoder encoder, final 
ProtocolDecoder decoder) {
+               if (encoder == null) {
+                       throw new IllegalArgumentException("encoder");
+               }
+               if (decoder == null) {
+                       throw new IllegalArgumentException("decoder");
+               }
+
+               // Create the inner Factory based on the two parameters
+               this.factory = new ProtocolCodecFactory() {
+                       /**
+                        * {@inheritDoc}
+                        */
+                       @Override
+                       public ProtocolEncoder getEncoder(IoSession session) {
+                               return encoder;
+                       }
+
+                       /**
+                        * {@inheritDoc}
+                        */
+                       @Override
+                       public ProtocolDecoder getDecoder(IoSession session) {
+                               return decoder;
+                       }
+               };
+       }
+
+       /**
+        * Creates a new instance of ProtocolCodecFilter, without any factory. 
The
+        * encoder/decoder factory will be created as an inner class, using the 
two
+        * parameters (encoder and decoder), which are class names. Instances 
for those
+        * classes will be created in this constructor.
+        * 
+        * @param encoderClass The class responsible for encoding the message
+        * @param decoderClass The class responsible for decoding the message
+        */
+       public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> 
encoderClass,
+                       final Class<? extends ProtocolDecoder> decoderClass) {
+               if (encoderClass == null) {
+                       throw new IllegalArgumentException("encoderClass");
+               }
+               if (decoderClass == null) {
+                       throw new IllegalArgumentException("decoderClass");
+               }
+               if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
+                       throw new IllegalArgumentException("encoderClass: " + 
encoderClass.getName());
+               }
+               if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
+                       throw new IllegalArgumentException("decoderClass: " + 
decoderClass.getName());
+               }
+               try {
+                       encoderClass.getConstructor(EMPTY_PARAMS);
+               } catch (NoSuchMethodException e) {
+                       throw new IllegalArgumentException("encoderClass 
doesn't have a public default constructor.");
+               }
+               try {
+                       decoderClass.getConstructor(EMPTY_PARAMS);
+               } catch (NoSuchMethodException e) {
+                       throw new IllegalArgumentException("decoderClass 
doesn't have a public default constructor.");
+               }
+
+               final ProtocolEncoder encoder;
+
+               try {
+                       encoder = encoderClass.newInstance();
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("encoderClass cannot 
be initialized");
+               }
+
+               final ProtocolDecoder decoder;
+
+               try {
+                       decoder = decoderClass.newInstance();
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("decoderClass cannot 
be initialized");
+               }
+
+               // Create the inner factory based on the two parameters.
+               this.factory = new ProtocolCodecFactory() {
+                       /**
+                        * {@inheritDoc}
+                        */
+                       @Override
+                       public ProtocolEncoder getEncoder(IoSession session) 
throws Exception {
+                               return encoder;
+                       }
+
+                       /**
+                        * {@inheritDoc}
+                        */
+                       @Override
+                       public ProtocolDecoder getDecoder(IoSession session) 
throws Exception {
+                               return decoder;
+                       }
+               };
+       }
+
+       /**
+        * Get the encoder instance from a given session.
+        *
+        * @param session The associated session we will get the encoder from
+        * @return The encoder instance, if any
+        */
+       public ProtocolEncoder getEncoder(IoSession session) {
+               return (ProtocolEncoder) session.getAttribute(ENCODER);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void onPreAdd(IoFilterChain parent, String name, NextFilter 
nextFilter) throws Exception {
+               if (parent.contains(this)) {
+                       throw new IllegalArgumentException(
+                                       "You can't add the same filter instance 
more than once.  Create another instance and add it.");
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void onPostRemove(IoFilterChain parent, String name, NextFilter 
nextFilter) throws Exception {
+               // Clean everything
+               disposeCodec(parent.getSession());
+       }
+
+       /**
+        * Process the incoming message, calling the session decoder. As the 
incoming
+        * buffer might contains more than one messages, we have to loop until 
the
+        * decoder throws an exception.
+        * 
+        * while ( buffer not empty ) try decode ( buffer ) catch break;
+        * 
+        */
+       @Override
+       public void messageReceived(final NextFilter nextFilter, final 
IoSession session, final Object message)
+                       throws Exception {
+               if (LOGGER.isDebugEnabled()) {
+                       LOGGER.debug("Processing a MESSAGE_RECEIVED for session 
{}", session.getId());
+               }
+
+               if (!(message instanceof IoBuffer)) {
+                       nextFilter.messageReceived(session, message);
+                       return;
+               }
+
+               final IoBuffer in = (IoBuffer) message;
+               final ProtocolDecoder decoder = factory.getDecoder(session);
+               final ProtocolDecoderOutputImpl decoderOut = 
DECODER_OUTPUT.get();
+
+               // Loop until we don't have anymore byte in the buffer,
+               // or until the decoder throws an unrecoverable exception or
+               // can't decoder a message, because there are not enough
+               // data in the buffer
+               while (in.hasRemaining()) {
+                       int oldPos = in.position();
+                       try {
+                               // Call the decoder with the read bytes
+                               decoder.decode(session, in, decoderOut);
+                               // Finish decoding if no exception was thrown.
+                               decoderOut.flush(nextFilter, session);
+                       } catch (Exception e) {
+                               ProtocolDecoderException pde;
+                               if (e instanceof ProtocolDecoderException) {
+                                       pde = (ProtocolDecoderException) e;
+                               } else {
+                                       pde = new ProtocolDecoderException(e);
+                               }
+                               if (pde.getHexdump() == null) {
+                                       // Generate a message hex dump
+                                       int curPos = in.position();
+                                       in.position(oldPos);
+                                       pde.setHexdump(in.getHexDump());
+                                       in.position(curPos);
+                               }
+                               // Fire the exceptionCaught event.
+                               decoderOut.flush(nextFilter, session);
+                               nextFilter.exceptionCaught(session, pde);
+                               // Retry only if the type of the caught 
exception is
+                               // recoverable and the buffer position has 
changed.
+                               // We check buffer position additionally to 
prevent an
+                               // infinite loop.
+                               if (!(e instanceof 
RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
+                                       break;
+                               }
+                       }
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void messageSent(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
+               if (writeRequest instanceof EncodedWriteRequest) {
+                       return;
+               }
+
+               nextFilter.messageSent(session, writeRequest);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void filterWrite(final NextFilter nextFilter, final IoSession 
session, final WriteRequest writeRequest)
+                       throws Exception {
+               final Object message = writeRequest.getMessage();
+
+               // Bypass the encoding if the message is contained in a 
IoBuffer,
+               // as it has already been encoded before
+               if ((message instanceof IoBuffer) || (message instanceof 
FileRegion)) {
                        nextFilter.filterWrite(session, writeRequest);
-                   } else {
-                       SocketAddress destination = 
writeRequest.getDestination();
-                       WriteRequest encodedWriteRequest = new 
EncodedWriteRequest(encodedMessage, null, destination);
-                       nextFilter.filterWrite(session, encodedWriteRequest);
-                   }
+                       return;
+               }
+
+               // Get the encoder in the session
+               final ProtocolEncoder encoder = factory.getEncoder(session);
+               final ProtocolEncoderOutputImpl encoderOut = 
ENCODER_OUTPUT.get();
+
+               if (encoder == null) {
+                       throw new ProtocolEncoderException("The encoder is null 
for the session " + session);
+               }
+
+               try {
+                       // Now we can try to encode the response
+                       encoder.encode(session, message, encoderOut);
+
+                       final Queue<Object> queue = encoderOut.messageQueue;
+
+                       if (queue.isEmpty()) {
+                               // Write empty message to ensure that 
messageSent is fired later
+                               writeRequest.setMessage(EMPTY_BUFFER);
+                               nextFilter.filterWrite(session, writeRequest);
+                       } else {
+                               // Write all the encoded messages now
+                               Object encodedMessage = null;
+
+                               while ((encodedMessage = queue.poll()) != null) 
{
+                                       if (queue.isEmpty()) {
+                                               // Write last message using 
original WriteRequest to ensure that any Future and
+                                               // dependency on messageSent 
event is emitted correctly
+                                               
writeRequest.setMessage(encodedMessage);
+                                               nextFilter.filterWrite(session, 
writeRequest);
+                                       } else {
+                                               SocketAddress destination = 
writeRequest.getDestination();
+                                               WriteRequest 
encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, 
destination);
+                                               nextFilter.filterWrite(session, 
encodedWriteRequest);
+                                       }
+                               }
+                       }
+               } catch (final ProtocolEncoderException e) {
+                       throw e;
+               } catch (final Exception e) {
+                       // Generate the correct exception
+                       throw new ProtocolEncoderException(e);
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void sessionClosed(NextFilter nextFilter, IoSession session) 
throws Exception {
+               // Call finishDecode() first when a connection is closed.
+               ProtocolDecoder decoder = factory.getDecoder(session);
+               ProtocolDecoderOutput decoderOut = DECODER_OUTPUT.get();
+
+               try {
+                       decoder.finishDecode(session, decoderOut);
+               } catch (Exception e) {
+                       ProtocolDecoderException pde;
+                       if (e instanceof ProtocolDecoderException) {
+                               pde = (ProtocolDecoderException) e;
+                       } else {
+                               pde = new ProtocolDecoderException(e);
+                       }
+                       throw pde;
+               } finally {
+                       // Dispose everything
+                       disposeCodec(session);
+                       decoderOut.flush(nextFilter, session);
+               }
+
+               // Call the next filter
+               nextFilter.sessionClosed(session);
+       }
+
+       private static class EncodedWriteRequest extends DefaultWriteRequest {
+               public EncodedWriteRequest(Object encodedMessage, WriteFuture 
future, SocketAddress destination) {
+                       super(encodedMessage, future, destination);
+               }
+
+               /**
+                * {@inheritDoc}
+                */
+               @Override
+               public boolean isEncoded() {
+                       return true;
+               }
+       }
+
+       private static class ProtocolDecoderOutputImpl extends 
AbstractProtocolDecoderOutput {
+               public ProtocolDecoderOutputImpl() {
+                       // Do nothing
+               }
+       }
+
+       private static class ProtocolEncoderOutputImpl extends 
AbstractProtocolEncoderOutput {
+               public ProtocolEncoderOutputImpl() {
+                       // Do nothing
+               }
+       }
+
+       // ----------- Helper methods 
---------------------------------------------
+       /**
+        * Dispose the encoder, decoder, and the callback for the decoded 
messages.
+        */
+       private void disposeCodec(IoSession session) {
+               // We just remove the two instances of encoder/decoder to 
release resources
+               // from the session
+               disposeEncoder(session);
+               disposeDecoder(session);
+       }
+
+       /**
+        * Dispose the encoder, removing its instance from the session's 
attributes, and
+        * calling the associated dispose method.
+        */
+       private void disposeEncoder(IoSession session) {
+               ProtocolEncoder encoder = (ProtocolEncoder) 
session.removeAttribute(ENCODER);
+               if (encoder == null) {
+                       return;
+               }
+
+               try {
+                       encoder.dispose(session);
+               } catch (Exception e) {
+                       LOGGER.warn("Failed to dispose: " + 
encoder.getClass().getName() + " (" + encoder + ')');
+               }
+       }
+
+       /**
+        * Dispose the decoder, removing its instance from the session's 
attributes, and
+        * calling the associated dispose method.
+        */
+       private void disposeDecoder(IoSession session) {
+               ProtocolDecoder decoder = (ProtocolDecoder) 
session.removeAttribute(DECODER);
+               if (decoder == null) {
+                       return;
+               }
+
+               try {
+                       decoder.dispose(session);
+               } catch (Exception e) {
+                       LOGGER.warn("Failed to dispose: " + 
decoder.getClass().getName() + " (" + decoder + ')');
+               }
+       }
+
+       static private class ProtocolDecoderOutputLocal extends 
ThreadLocal<ProtocolDecoderOutputImpl> {
+               @Override
+               protected ProtocolDecoderOutputImpl initialValue() {
+                       return new ProtocolDecoderOutputImpl();
+               }
+       }
+
+       static private class ProtocolEncoderOutputLocal extends 
ThreadLocal<ProtocolEncoderOutputImpl> {
+               @Override
+               protected ProtocolEncoderOutputImpl initialValue() {
+                       return new ProtocolEncoderOutputImpl();
                }
-           }
-        } catch (Exception e) {
-            ProtocolEncoderException pee;
-
-            // Generate the correct exception
-            if (e instanceof ProtocolEncoderException) {
-                pee = (ProtocolEncoderException) e;
-            } else {
-                pee = new ProtocolEncoderException(e);
-            }
-
-            throw pee;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void sessionClosed(NextFilter nextFilter, IoSession session) throws 
Exception {
-        // Call finishDecode() first when a connection is closed.
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        try {
-            decoder.finishDecode(session, decoderOut);
-        } catch (Exception e) {
-            ProtocolDecoderException pde;
-            if (e instanceof ProtocolDecoderException) {
-                pde = (ProtocolDecoderException) e;
-            } else {
-                pde = new ProtocolDecoderException(e);
-            }
-            throw pde;
-        } finally {
-            // Dispose everything
-            disposeCodec(session);
-            decoderOut.flush(nextFilter, session);
-        }
-
-        // Call the next filter
-        nextFilter.sessionClosed(session);
-    }
-
-    private static class EncodedWriteRequest extends DefaultWriteRequest {
-        public EncodedWriteRequest(Object encodedMessage, WriteFuture future, 
SocketAddress destination) {
-            super(encodedMessage, future, destination);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public boolean isEncoded() {
-            return true;
-        }
-    }
-
-    private static class ProtocolDecoderOutputImpl extends 
AbstractProtocolDecoderOutput {
-        public ProtocolDecoderOutputImpl() {
-            // Do nothing
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void flush(NextFilter nextFilter, IoSession session) {
-            Queue<Object> messageQueue = getMessageQueue();
-
-            while (!messageQueue.isEmpty()) {
-                nextFilter.messageReceived(session, messageQueue.poll());
-            }
-        }
-    }
-
-    private static class ProtocolEncoderOutputImpl extends 
AbstractProtocolEncoderOutput {
-        private final IoSession session;
-
-        private final NextFilter nextFilter;
-
-        /** The WriteRequest destination */
-        private final SocketAddress destination;
-
-        public ProtocolEncoderOutputImpl(IoSession session, NextFilter 
nextFilter, WriteRequest writeRequest) {
-            this.session = session;
-            this.nextFilter = nextFilter;
-
-            // Only store the destination, not the full WriteRequest.
-            destination = writeRequest.getDestination();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            Queue<Object> bufferQueue = getMessageQueue();
-            WriteFuture future = null;
-
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-                // Flush only when the buffer has remaining.
-                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
encodedMessage).hasRemaining()) {
-                    future = new DefaultWriteFuture(session);
-                    nextFilter.filterWrite(session, new 
EncodedWriteRequest(encodedMessage, future, destination));
-                }
-            }
-
-            if (future == null) {
-                // Creates an empty writeRequest containing the destination
-                future = DefaultWriteFuture.newNotWrittenFuture(session, new 
NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
-            }
-
-            return future;
-        }
-    }
-
-    //----------- Helper methods ---------------------------------------------
-    /**
-     * Dispose the encoder, decoder, and the callback for the decoded
-     * messages.
-     */
-    private void disposeCodec(IoSession session) {
-        // We just remove the two instances of encoder/decoder to release 
resources
-        // from the session
-        disposeEncoder(session);
-        disposeDecoder(session);
-
-        // We also remove the callback
-        disposeDecoderOut(session);
-    }
-
-    /**
-     * Dispose the encoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeEncoder(IoSession session) {
-        ProtocolEncoder encoder = (ProtocolEncoder) 
session.removeAttribute(ENCODER);
-        if (encoder == null) {
-            return;
-        }
-
-        try {
-            encoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + 
" (" + encoder + ')');
-        }
-    }
-
-    /**
-     * Dispose the decoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeDecoder(IoSession session) {
-        ProtocolDecoder decoder = (ProtocolDecoder) 
session.removeAttribute(DECODER);
-        if (decoder == null) {
-            return;
-        }
-
-        try {
-            decoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + 
" (" + decoder + ')');
-        }
-    }
-
-    /**
-     * Return a reference to the decoder callback. If it's not already created
-     * and stored into the session, we create a new instance.
-     */
-    private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter 
nextFilter) {
-        ProtocolDecoderOutput out = (ProtocolDecoderOutput) 
session.getAttribute(DECODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolDecoderOutputImpl();
-            session.setAttribute(DECODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter 
nextFilter, WriteRequest writeRequest) {
-        ProtocolEncoderOutput out = (ProtocolEncoderOutput) 
session.getAttribute(ENCODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolEncoderOutputImpl(session, nextFilter, 
writeRequest);
-            session.setAttribute(ENCODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    /**
-     * Remove the decoder callback from the session's attributes.
-     */
-    private void disposeDecoderOut(IoSession session) {
-        session.removeAttribute(DECODER_OUT);
-    }
+       }
 }
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
index 2b5f89c..1638491 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
@@ -59,17 +59,8 @@ import org.apache.mina.core.session.IoSession;
  */
 public class ProtocolCodecSession extends DummySession {
 
-    private final WriteFuture notWrittenFuture = 
DefaultWriteFuture.newNotWrittenFuture(this,
-            new UnsupportedOperationException());
-
     private final AbstractProtocolEncoderOutput encoderOutput = new 
AbstractProtocolEncoderOutput() {
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            return notWrittenFuture;
-        }
+
     };
 
     private final AbstractProtocolDecoderOutput decoderOutput = new 
AbstractProtocolDecoderOutput() {
@@ -101,7 +92,7 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered encoder output.
      */
     public Queue<Object> getEncoderOutputQueue() {
-        return encoderOutput.getMessageQueue();
+        return encoderOutput.messageQueue;
     }
 
     /**
@@ -116,6 +107,6 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered decoder output.
      */
     public Queue<Object> getDecoderOutputQueue() {
-        return decoderOutput.getMessageQueue();
+        return decoderOutput.messageQueue;
     }
 }
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
index 0fc847c..508ee23 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
@@ -21,44 +21,22 @@ package org.apache.mina.filter.codec;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.future.WriteFuture;
 
 /**
  * Callback for {@link ProtocolEncoder} to generate encoded messages such as
- * {@link IoBuffer}s.  {@link ProtocolEncoder} must call {@link #write(Object)}
+ * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)}
  * for each encoded message.
  *
  * @author <a href="http://mina.apache.org";>Apache MINA Project</a>
  */
 public interface ProtocolEncoderOutput {
-    /**
-     * Callback for {@link ProtocolEncoder} to generate an encoded message such
-     * as an {@link IoBuffer}. {@link ProtocolEncoder} must call
-     * {@link #write(Object)} for each encoded message.
-     *
-     * @param encodedMessage the encoded message, typically an {@link IoBuffer}
-     *                       or a {@link FileRegion}.
-     */
-    void write(Object encodedMessage);
-
-    /**
-     * Merges all buffers you wrote via {@link #write(Object)} into
-     * one {@link IoBuffer} and replaces the old fragmented ones with it.
-     * This method is useful when you want to control the way MINA generates
-     * network packets.  Please note that this method only works when you
-     * called {@link #write(Object)} method with only {@link IoBuffer}s.
-     * 
-     * @throws IllegalStateException if you wrote something else than {@link 
IoBuffer}
-     */
-    void mergeAll();
-
-    /**
-     * Flushes all buffers you wrote via {@link #write(Object)} to
-     * the session.  This operation is asynchronous; please wait for
-     * the returned {@link WriteFuture} if you want to wait for
-     * the buffers flushed.
-     *
-     * @return <tt>null</tt> if there is nothing to flush at all.
-     */
-    WriteFuture flush();
+       /**
+        * Callback for {@link ProtocolEncoder} to generate an encoded message 
such as
+        * an {@link IoBuffer}. {@link ProtocolEncoder} must call {@link 
#write(Object)}
+        * for each encoded message.
+        *
+        * @param message the encoded message, typically an {@link IoBuffer} or 
a
+        *                {@link FileRegion}.
+        */
+       void write(Object message);
 }
\ No newline at end of file
diff --git 
a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java 
b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
index 87b886d..f8497b8 100644
--- a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
+++ b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
@@ -25,9 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
+import java.util.Queue;
 
 import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter.NextFilter;
 import org.apache.mina.core.session.DummySession;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput;
@@ -37,262 +37,250 @@ import org.apache.mina.http.api.HttpRequest;
 import org.junit.Test;
 
 public class HttpServerDecoderTest {
-    private static final CharsetEncoder encoder = 
Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
+       private static final CharsetEncoder encoder = 
Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
 
-    private static final ProtocolDecoder decoder = new HttpServerDecoder();
+       private static final ProtocolDecoder decoder = new HttpServerDecoder();
 
-    /*
-     * Use a single session for all requests in order to test state management 
better
-     */
-    private static IoSession session = new DummySession();
+       /*
+        * Use a single session for all requests in order to test state 
management
+        * better
+        */
+       private static IoSession session = new DummySession();
 
-    /**
-     * Build an IO buffer containing a simple minimal HTTP request.
-     * 
-     * @param method the HTTP method
-     * @param body the option body
-     * @return the built IO buffer
-     * @throws CharacterCodingException if encoding fails
-     */
-    protected static IoBuffer getRequestBuffer(String method, String body) 
throws CharacterCodingException {
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
-        
-        if (body != null) {
-            buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", 
encoder);
-            buffer.putString(body, encoder);
-        } else {
-            buffer.putString("\r\n", encoder);
-        }
-        
-        buffer.rewind();
-        
-        return buffer;
-    }
+       /**
+        * Build an IO buffer containing a simple minimal HTTP request.
+        * 
+        * @param method the HTTP method
+        * @param body   the option body
+        * @return the built IO buffer
+        * @throws CharacterCodingException if encoding fails
+        */
+       protected static IoBuffer getRequestBuffer(String method, String body) 
throws CharacterCodingException {
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", 
encoder);
 
-    protected static IoBuffer getRequestBuffer(String method) throws 
CharacterCodingException {
-        return getRequestBuffer(method, null);
-    }
+               if (body != null) {
+                       buffer.putString("Content-Length: " + body.length() + 
"\r\n\r\n", encoder);
+                       buffer.putString(body, encoder);
+               } else {
+                       buffer.putString("\r\n", encoder);
+               }
 
-    /**
-     * Execute an HTPP request and return the queue of messages.
-     * 
-     * @param method the HTTP method
-     * @param body the optional body
-     * @return the protocol output and its queue of messages
-     * @throws Exception if error occurs (encoding,...)
-     */
-    protected static AbstractProtocolDecoderOutput executeRequest(String 
method, String body) throws Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
+               buffer.rewind();
 
-        IoBuffer buffer = getRequestBuffer(method, body); //$NON-NLS-1$
-        
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        
-        return out;
-    }
+               return buffer;
+       }
 
-    @Test
-    public void testGetRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       protected static IoBuffer getRequestBuffer(String method) throws 
CharacterCodingException {
+               return getRequestBuffer(method, null);
+       }
 
-    @Test
-    public void testGetRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       protected static class ProtocolDecoderQueue extends 
AbstractProtocolDecoderOutput {
+               public Queue<Object> getQueue() {
+                       return this.messageQueue;
+               }
+       }
 
-    @Test
-    public void testPutRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       /**
+        * Execute an HTPP request and return the queue of messages.
+        * 
+        * @param method the HTTP method
+        * @param body   the optional body
+        * @return the protocol output and its queue of messages
+        * @throws Exception if error occurs (encoding,...)
+        */
+       protected static ProtocolDecoderQueue executeRequest(String method, 
String body) throws Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
 
-    @Test
-    public void testPutRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+               IoBuffer buffer = getRequestBuffer(method, body); // $NON-NLS-1$
 
-    @Test
-    public void testPostRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
 
-    @Test
-    public void testPostRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+               return out;
+       }
 
-    @Test
-    public void testDeleteRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       @Test
+       public void testGetRequestWithoutBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("GET", null);
+               assertEquals(2, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
 
-    @Test
-    public void testDeleteRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void testDIRMINA965NoContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       @Test
+       public void testGetRequestBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("GET", "body");
+               assertEquals(3, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
 
-    @Test
-    public void testDIRMINA965WithContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    @Test
-    public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("B", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(4, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws 
Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       @Test
+       public void testPutRequestWithoutBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("PUT", null);
+               assertEquals(2, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
 
-    @Test
-    public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception 
{
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", 
encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       @Test
+       public void testPutRequestBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("PUT", "body");
+               assertEquals(3, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
 
-    @Test
-    public void verifyThatTrailingSpacesAreRemovedFromHeader() throws 
Exception {
-        AbstractProtocolDecoderOutput out = new 
AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", 
encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+       @Test
+       public void testPostRequestWithoutBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("POST", null);
+               assertEquals(2, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testPostRequestBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("POST", "body");
+               assertEquals(3, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testDeleteRequestWithoutBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("DELETE", null);
+               assertEquals(2, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testDeleteRequestBody() throws Exception {
+               ProtocolDecoderQueue out = executeRequest("DELETE", "body");
+               assertEquals(3, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testDIRMINA965NoContent() throws Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("dummy\r\n\r\n", encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               assertEquals(2, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testDIRMINA965WithContent() throws Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", 
encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+
+               assertEquals(3, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", 
encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("B", encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               assertEquals(4, out.getQueue().size());
+               assertTrue(out.getQueue().poll() instanceof HttpRequest);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof IoBuffer);
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws 
Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", 
encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               assertEquals(2, out.getQueue().size());
+               HttpRequest request = (HttpRequest) out.getQueue().poll();
+               assertEquals("localhost", request.getHeader("host"));
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void verifyThatLeadingSpacesAreRemovedFromHeader() throws 
Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", 
encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               assertEquals(2, out.getQueue().size());
+               HttpRequest request = (HttpRequest) out.getQueue().poll();
+               assertEquals("localhost", request.getHeader("host"));
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
+
+       @Test
+       public void verifyThatTrailingSpacesAreRemovedFromHeader() throws 
Exception {
+               ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+               IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+               buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", 
encoder);
+               buffer.rewind();
+               while (buffer.hasRemaining()) {
+                       decoder.decode(session, buffer, out);
+               }
+               assertEquals(2, out.getQueue().size());
+               HttpRequest request = (HttpRequest) out.getQueue().poll();
+               assertEquals("localhost", request.getHeader("host"));
+               assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+       }
 }

Reply via email to