This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch 2.1.X in repository https://gitbox.apache.org/repos/asf/mina.git
commit 95b5e041427407fb51aac541d16f45fe16f3e6b0 Author: Jonathan Valliere <john...@apache.org> AuthorDate: Mon May 17 12:22:06 2021 -0400 Fix DIRMINA-1142 Removes the code attempting to reuse the WriteRequest because the empty check is not valid in concurrent applications since the queue is not thread-local. --- .../mina/filter/codec/ProtocolCodecFilter.java | 994 ++++++++++----------- 1 file changed, 490 insertions(+), 504 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java index a460b3d..55b799c 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java @@ -47,509 +47,495 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean */ public class ProtocolCodecFilter extends IoFilterAdapter { - /** A logger for this class */ - private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class); - - private static final Class<?>[] EMPTY_PARAMS = new Class[0]; - - private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); - - private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder"); - - private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder"); - - private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut"); - - private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut"); - - /** The factory responsible for creating the encoder and decoder */ - private final ProtocolCodecFactory factory; - - /** - * Creates a new instance of ProtocolCodecFilter, associating a factory - * for the creation of the encoder and decoder. - * - * @param factory The associated factory - */ - public ProtocolCodecFilter(ProtocolCodecFactory factory) { - if (factory == null) { - throw new IllegalArgumentException("factory"); - } - - this.factory = factory; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder). - * - * @param encoder The class responsible for encoding the message - * @param decoder The class responsible for decoding the message - */ - public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { - if (encoder == null) { - throw new IllegalArgumentException("encoder"); - } - if (decoder == null) { - throw new IllegalArgumentException("decoder"); - } - - // Create the inner Factory based on the two parameters - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) { - return decoder; - } - }; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder), which are class names. Instances - * for those classes will be created in this constructor. - * - * @param encoderClass The class responsible for encoding the message - * @param decoderClass The class responsible for decoding the message - */ - public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass, - final Class<? extends ProtocolDecoder> decoderClass) { - if (encoderClass == null) { - throw new IllegalArgumentException("encoderClass"); - } - if (decoderClass == null) { - throw new IllegalArgumentException("decoderClass"); - } - if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { - throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); - } - if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { - throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); - } - try { - encoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); - } - try { - decoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); - } - - final ProtocolEncoder encoder; - - try { - encoder = encoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("encoderClass cannot be initialized"); - } - - final ProtocolDecoder decoder; - - try { - decoder = decoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("decoderClass cannot be initialized"); - } - - // Create the inner factory based on the two parameters. - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) throws Exception { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) throws Exception { - return decoder; - } - }; - } - - /** - * Get the encoder instance from a given session. - * - * @param session The associated session we will get the encoder from - * @return The encoder instance, if any - */ - public ProtocolEncoder getEncoder(IoSession session) { - return (ProtocolEncoder) session.getAttribute(ENCODER); - } - - /** - * {@inheritDoc} - */ - @Override - public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - if (parent.contains(this)) { - throw new IllegalArgumentException( - "You can't add the same filter instance more than once. Create another instance and add it."); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - // Clean everything - disposeCodec(parent.getSession()); - } - - /** - * Process the incoming message, calling the session decoder. As the incoming - * buffer might contains more than one messages, we have to loop until the decoder - * throws an exception. - * - * while ( buffer not empty ) - * try - * decode ( buffer ) - * catch - * break; - * - */ - @Override - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId()); - } - - if (!(message instanceof IoBuffer)) { - nextFilter.messageReceived(session, message); - return; - } - - IoBuffer in = (IoBuffer) message; - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); - - // Loop until we don't have anymore byte in the buffer, - // or until the decoder throws an unrecoverable exception or - // can't decoder a message, because there are not enough - // data in the buffer - while (in.hasRemaining()) { - int oldPos = in.position(); - try { - synchronized (session) { - // Call the decoder with the read bytes - decoder.decode(session, in, decoderOut); - } - // Finish decoding if no exception was thrown. - decoderOut.flush(nextFilter, session); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - if (pde.getHexdump() == null) { - // Generate a message hex dump - int curPos = in.position(); - in.position(oldPos); - pde.setHexdump(in.getHexDump()); - in.position(curPos); - } - // Fire the exceptionCaught event. - decoderOut.flush(nextFilter, session); - nextFilter.exceptionCaught(session, pde); - // Retry only if the type of the caught exception is - // recoverable and the buffer position has changed. - // We check buffer position additionally to prevent an - // infinite loop. - if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { - break; - } - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - if (writeRequest instanceof EncodedWriteRequest) { - return; - } - - nextFilter.messageSent(session, writeRequest); - } - - /** - * {@inheritDoc} - */ - @Override - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - Object message = writeRequest.getMessage(); - - // Bypass the encoding if the message is contained in a IoBuffer, - // as it has already been encoded before - if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { - nextFilter.filterWrite(session, writeRequest); - return; - } - - // Get the encoder in the session - ProtocolEncoder encoder = factory.getEncoder(session); - - ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); - - if (encoder == null) { - throw new ProtocolEncoderException("The encoder is null for the session " + session); - } - - try { - // Now we can try to encode the response - encoder.encode(session, message, encoderOut); - - // Send it directly - Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue(); - - // Write all the encoded messages now - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - if (bufferQueue.isEmpty()) { - writeRequest.setMessage(encodedMessage); + /** A logger for this class */ + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class); + + private static final Class<?>[] EMPTY_PARAMS = new Class[0]; + + private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); + + private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder"); + + private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder"); + + private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut"); + + private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut"); + + /** The factory responsible for creating the encoder and decoder */ + private final ProtocolCodecFactory factory; + + /** + * Creates a new instance of ProtocolCodecFilter, associating a factory for the + * creation of the encoder and decoder. + * + * @param factory The associated factory + */ + public ProtocolCodecFilter(ProtocolCodecFactory factory) { + if (factory == null) { + throw new IllegalArgumentException("factory"); + } + + this.factory = factory; + } + + /** + * Creates a new instance of ProtocolCodecFilter, without any factory. The + * encoder/decoder factory will be created as an inner class, using the two + * parameters (encoder and decoder). + * + * @param encoder The class responsible for encoding the message + * @param decoder The class responsible for decoding the message + */ + public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { + if (encoder == null) { + throw new IllegalArgumentException("encoder"); + } + if (decoder == null) { + throw new IllegalArgumentException("decoder"); + } + + // Create the inner Factory based on the two parameters + this.factory = new ProtocolCodecFactory() { + /** + * {@inheritDoc} + */ + @Override + public ProtocolEncoder getEncoder(IoSession session) { + return encoder; + } + + /** + * {@inheritDoc} + */ + @Override + public ProtocolDecoder getDecoder(IoSession session) { + return decoder; + } + }; + } + + /** + * Creates a new instance of ProtocolCodecFilter, without any factory. The + * encoder/decoder factory will be created as an inner class, using the two + * parameters (encoder and decoder), which are class names. Instances for those + * classes will be created in this constructor. + * + * @param encoderClass The class responsible for encoding the message + * @param decoderClass The class responsible for decoding the message + */ + public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass, + final Class<? extends ProtocolDecoder> decoderClass) { + if (encoderClass == null) { + throw new IllegalArgumentException("encoderClass"); + } + if (decoderClass == null) { + throw new IllegalArgumentException("decoderClass"); + } + if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { + throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); + } + if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { + throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); + } + try { + encoderClass.getConstructor(EMPTY_PARAMS); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); + } + try { + decoderClass.getConstructor(EMPTY_PARAMS); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); + } + + final ProtocolEncoder encoder; + + try { + encoder = encoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("encoderClass cannot be initialized"); + } + + final ProtocolDecoder decoder; + + try { + decoder = decoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("decoderClass cannot be initialized"); + } + + // Create the inner factory based on the two parameters. + this.factory = new ProtocolCodecFactory() { + /** + * {@inheritDoc} + */ + @Override + public ProtocolEncoder getEncoder(IoSession session) throws Exception { + return encoder; + } + + /** + * {@inheritDoc} + */ + @Override + public ProtocolDecoder getDecoder(IoSession session) throws Exception { + return decoder; + } + }; + } + + /** + * Get the encoder instance from a given session. + * + * @param session The associated session we will get the encoder from + * @return The encoder instance, if any + */ + public ProtocolEncoder getEncoder(IoSession session) { + return (ProtocolEncoder) session.getAttribute(ENCODER); + } + + /** + * {@inheritDoc} + */ + @Override + public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { + if (parent.contains(this)) { + throw new IllegalArgumentException( + "You can't add the same filter instance more than once. Create another instance and add it."); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { + // Clean everything + disposeCodec(parent.getSession()); + } + + /** + * Process the incoming message, calling the session decoder. As the incoming + * buffer might contains more than one messages, we have to loop until the + * decoder throws an exception. + * + * while ( buffer not empty ) try decode ( buffer ) catch break; + * + */ + @Override + public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId()); + } + + if (!(message instanceof IoBuffer)) { + nextFilter.messageReceived(session, message); + return; + } + + IoBuffer in = (IoBuffer) message; + ProtocolDecoder decoder = factory.getDecoder(session); + ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); + + // Loop until we don't have anymore byte in the buffer, + // or until the decoder throws an unrecoverable exception or + // can't decoder a message, because there are not enough + // data in the buffer + while (in.hasRemaining()) { + int oldPos = in.position(); + try { + synchronized (session) { + // Call the decoder with the read bytes + decoder.decode(session, in, decoderOut); + } + // Finish decoding if no exception was thrown. + decoderOut.flush(nextFilter, session); + } catch (Exception e) { + ProtocolDecoderException pde; + if (e instanceof ProtocolDecoderException) { + pde = (ProtocolDecoderException) e; + } else { + pde = new ProtocolDecoderException(e); + } + if (pde.getHexdump() == null) { + // Generate a message hex dump + int curPos = in.position(); + in.position(oldPos); + pde.setHexdump(in.getHexDump()); + in.position(curPos); + } + // Fire the exceptionCaught event. + decoderOut.flush(nextFilter, session); + nextFilter.exceptionCaught(session, pde); + // Retry only if the type of the caught exception is + // recoverable and the buffer position has changed. + // We check buffer position additionally to prevent an + // infinite loop. + if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { + break; + } + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { + if (writeRequest instanceof EncodedWriteRequest) { + return; + } + + nextFilter.messageSent(session, writeRequest); + } + + /** + * {@inheritDoc} + */ + @Override + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { + Object message = writeRequest.getMessage(); + + // Bypass the encoding if the message is contained in a IoBuffer, + // as it has already been encoded before + if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { nextFilter.filterWrite(session, writeRequest); - } else { - SocketAddress destination = writeRequest.getDestination(); - WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); - nextFilter.filterWrite(session, encodedWriteRequest); - } - } - } - } catch (Exception e) { - ProtocolEncoderException pee; - - // Generate the correct exception - if (e instanceof ProtocolEncoderException) { - pee = (ProtocolEncoderException) e; - } else { - pee = new ProtocolEncoderException(e); - } - - throw pee; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { - // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); - - try { - decoder.finishDecode(session, decoderOut); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - throw pde; - } finally { - // Dispose everything - disposeCodec(session); - decoderOut.flush(nextFilter, session); - } - - // Call the next filter - nextFilter.sessionClosed(session); - } - - private static class EncodedWriteRequest extends DefaultWriteRequest { - public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { - super(encodedMessage, future, destination); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isEncoded() { - return true; - } - } - - private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { - public ProtocolDecoderOutputImpl() { - // Do nothing - } - - /** - * {@inheritDoc} - */ - @Override - public void flush(NextFilter nextFilter, IoSession session) { - Queue<Object> messageQueue = getMessageQueue(); - - while (!messageQueue.isEmpty()) { - nextFilter.messageReceived(session, messageQueue.poll()); - } - } - } - - private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { - private final IoSession session; - - private final NextFilter nextFilter; - - /** The WriteRequest destination */ - private final SocketAddress destination; - - public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - this.session = session; - this.nextFilter = nextFilter; - - // Only store the destination, not the full WriteRequest. - destination = writeRequest.getDestination(); - } - - /** - * {@inheritDoc} - */ - @Override - public WriteFuture flush() { - Queue<Object> bufferQueue = getMessageQueue(); - WriteFuture future = null; - - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - future = new DefaultWriteFuture(session); - nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); - } - } - - if (future == null) { - // Creates an empty writeRequest containing the destination - future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST)); - } - - return future; - } - } - - //----------- Helper methods --------------------------------------------- - /** - * Dispose the encoder, decoder, and the callback for the decoded - * messages. - */ - private void disposeCodec(IoSession session) { - // We just remove the two instances of encoder/decoder to release resources - // from the session - disposeEncoder(session); - disposeDecoder(session); - - // We also remove the callback - disposeDecoderOut(session); - } - - /** - * Dispose the encoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeEncoder(IoSession session) { - ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); - if (encoder == null) { - return; - } - - try { - encoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); - } - } - - /** - * Dispose the decoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeDecoder(IoSession session) { - ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); - if (decoder == null) { - return; - } - - try { - decoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); - } - } - - /** - * Return a reference to the decoder callback. If it's not already created - * and stored into the session, we create a new instance. - */ - private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { - ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolDecoderOutputImpl(); - session.setAttribute(DECODER_OUT, out); - } - - return out; - } - - private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); - session.setAttribute(ENCODER_OUT, out); - } - - return out; - } - - /** - * Remove the decoder callback from the session's attributes. - */ - private void disposeDecoderOut(IoSession session) { - session.removeAttribute(DECODER_OUT); - } + return; + } + + // Get the encoder in the session + ProtocolEncoder encoder = factory.getEncoder(session); + + ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); + + if (encoder == null) { + throw new ProtocolEncoderException("The encoder is null for the session " + session); + } + + try { + // Now we can try to encode the response + encoder.encode(session, message, encoderOut); + + // Send it directly + Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue(); + + // Write all the encoded messages now + while (!bufferQueue.isEmpty()) { + Object encodedMessage = bufferQueue.poll(); + + if (encodedMessage == null) { + break; + } + + SocketAddress destination = writeRequest.getDestination(); + WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); + nextFilter.filterWrite(session, encodedWriteRequest); + } + } catch (Exception e) { + ProtocolEncoderException pee; + + // Generate the correct exception + if (e instanceof ProtocolEncoderException) { + pee = (ProtocolEncoderException) e; + } else { + pee = new ProtocolEncoderException(e); + } + + throw pee; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = factory.getDecoder(session); + ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); + + try { + decoder.finishDecode(session, decoderOut); + } catch (Exception e) { + ProtocolDecoderException pde; + if (e instanceof ProtocolDecoderException) { + pde = (ProtocolDecoderException) e; + } else { + pde = new ProtocolDecoderException(e); + } + throw pde; + } finally { + // Dispose everything + disposeCodec(session); + decoderOut.flush(nextFilter, session); + } + + // Call the next filter + nextFilter.sessionClosed(session); + } + + private static class EncodedWriteRequest extends DefaultWriteRequest { + public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { + super(encodedMessage, future, destination); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isEncoded() { + return true; + } + } + + private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { + public ProtocolDecoderOutputImpl() { + // Do nothing + } + + /** + * {@inheritDoc} + */ + @Override + public void flush(NextFilter nextFilter, IoSession session) { + Queue<Object> messageQueue = getMessageQueue(); + + while (!messageQueue.isEmpty()) { + nextFilter.messageReceived(session, messageQueue.poll()); + } + } + } + + private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { + private final IoSession session; + + private final NextFilter nextFilter; + + /** The WriteRequest destination */ + private final SocketAddress destination; + + public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { + this.session = session; + this.nextFilter = nextFilter; + + // Only store the destination, not the full WriteRequest. + destination = writeRequest.getDestination(); + } + + /** + * {@inheritDoc} + */ + @Override + public WriteFuture flush() { + Queue<Object> bufferQueue = getMessageQueue(); + WriteFuture future = null; + + while (!bufferQueue.isEmpty()) { + Object encodedMessage = bufferQueue.poll(); + + if (encodedMessage == null) { + break; + } + + // Flush only when the buffer has remaining. + if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { + future = new DefaultWriteFuture(session); + nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); + } + } + + if (future == null) { + // Creates an empty writeRequest containing the destination + future = DefaultWriteFuture.newNotWrittenFuture(session, + new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST)); + } + + return future; + } + } + + // ----------- Helper methods --------------------------------------------- + /** + * Dispose the encoder, decoder, and the callback for the decoded messages. + */ + private void disposeCodec(IoSession session) { + // We just remove the two instances of encoder/decoder to release resources + // from the session + disposeEncoder(session); + disposeDecoder(session); + + // We also remove the callback + disposeDecoderOut(session); + } + + /** + * Dispose the encoder, removing its instance from the session's attributes, and + * calling the associated dispose method. + */ + private void disposeEncoder(IoSession session) { + ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); + if (encoder == null) { + return; + } + + try { + encoder.dispose(session); + } catch (Exception e) { + LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); + } + } + + /** + * Dispose the decoder, removing its instance from the session's attributes, and + * calling the associated dispose method. + */ + private void disposeDecoder(IoSession session) { + ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); + if (decoder == null) { + return; + } + + try { + decoder.dispose(session); + } catch (Exception e) { + LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); + } + } + + /** + * Return a reference to the decoder callback. If it's not already created and + * stored into the session, we create a new instance. + */ + private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { + ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); + + if (out == null) { + // Create a new instance, and stores it into the session + out = new ProtocolDecoderOutputImpl(); + session.setAttribute(DECODER_OUT, out); + } + + return out; + } + + private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { + ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); + + if (out == null) { + // Create a new instance, and stores it into the session + out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); + session.setAttribute(ENCODER_OUT, out); + } + + return out; + } + + /** + * Remove the decoder callback from the session's attributes. + */ + private void disposeDecoderOut(IoSession session) { + session.removeAttribute(DECODER_OUT); + } }