Author: davsclaus
Date: Wed Jun 13 09:25:16 2012
New Revision: 1349704

URL: http://svn.apache.org/viewvc?rev=1349704&view=rev
Log:
CAMEL-5225: Configured encoders and decoders must be shareable or implement 
ChannelHandlerFactory to be safely used with Netty.

Added:
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
Modified:
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
    
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml

Added: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java?rev=1349704&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
 Wed Jun 13 09:25:16 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.charset.Charset;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+
+/**
+ * Helper to create commonly used {@link ChannelHandlerFactory} instances.
+ */
+public final class ChannelHandlerFactories {
+
+    private ChannelHandlerFactories() {
+    }
+
+    public static ChannelHandlerFactory newStringEncoder(Charset charset) {
+        return new ShareableChannelHandlerFactory(new StringEncoder(charset));
+    }
+
+    public static ChannelHandlerFactory newStringDecoder(Charset charset) {
+        return new ShareableChannelHandlerFactory(new StringDecoder(charset));
+    }
+
+    public static ChannelHandlerFactory newObjectDecoder() {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new 
ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+            }
+        };
+    }
+
+    public static ChannelHandlerFactory newObjectEncoder() {
+        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    }
+
+    public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final 
int maxFrameLength, final ChannelBuffer[] delimiters) {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new DelimiterBasedFrameDecoder(maxFrameLength, true, 
delimiters);
+            }
+        };
+    }
+
+    public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final 
int maxFrameLength, final int lengthFieldOffset,
+                                                                        final 
int lengthFieldLength, final int lengthAdjustment,
+                                                                        final 
int initialBytesToStrip) {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new LengthFieldBasedFrameDecoder(maxFrameLength, 
lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
+            }
+        };
+    }
+
+}

Added: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java?rev=1349704&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java
 Wed Jun 13 09:25:16 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * Factory for creating new {@link ChannelHandler} used for non shareable
+ * encoders and decoders configured on the Camel {@link NettyComponent}.
+ * <p/>
+ * This is needed as Netty's {@link ChannelHandler} is often not shareable
+ * and therefore a new instance must be created when a handler is being
+ * added to a pipeline.
+ */
+public interface ChannelHandlerFactory extends ChannelHandler {
+
+    /**
+     * Creates a new {@link ChannelHandler} to be used.
+     */
+    ChannelHandler newChannelHandler();
+}

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 Wed Jun 13 09:25:16 2012
@@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
@@ -46,25 +45,40 @@ public class DefaultClientPipelineFactor
         SslHandler sslHandler = configureClientSSLOnDemand(producer);
         if (sslHandler != null) {
             LOG.debug("Client SSL handler configured and added to the 
ChannelPipeline");
-            channelPipeline.addLast("ssl", sslHandler);
+            addToPipeline("ssl", channelPipeline, sslHandler);
         }
 
-        List<ChannelUpstreamHandler> decoders = 
producer.getConfiguration().getDecoders();
+        List<ChannelHandler> decoders = 
producer.getConfiguration().getDecoders();
         for (int x = 0; x < decoders.size(); x++) {
-            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+            ChannelHandler decoder = decoders.get(x);
+            if (decoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as 
it may not be shareable
+                decoder = ((ChannelHandlerFactory) 
decoder).newChannelHandler();
+            }
+            addToPipeline("decoder-" + x, channelPipeline, decoder);
         }
 
-        List<ChannelDownstreamHandler> encoders = 
producer.getConfiguration().getEncoders();
+        List<ChannelHandler> encoders = 
producer.getConfiguration().getEncoders();
         for (int x = 0; x < encoders.size(); x++) {
-            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+            ChannelHandler encoder = encoders.get(x);
+            if (encoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as 
it may not be shareable
+                encoder = ((ChannelHandlerFactory) 
encoder).newChannelHandler();
+            }
+            addToPipeline("encoder-" + x, channelPipeline, encoder);
         }
 
         // our handler must be added last
-        channelPipeline.addLast("handler", new ClientChannelHandler(producer));
+        addToPipeline("handler", channelPipeline, new 
ClientChannelHandler(producer));
 
+        LOG.trace("Created ChannelPipeline: {}", channelPipeline);
         return channelPipeline;
     }
 
+    private void addToPipeline(String name, ChannelPipeline pipeline, 
ChannelHandler handler) {
+        pipeline.addLast(name, handler);
+    }
+
     private SslHandler configureClientSSLOnDemand(NettyProducer producer) 
throws Exception {
         if (!producer.getConfiguration().isSsl()) {
             return null;

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
 Wed Jun 13 09:25:16 2012
@@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
@@ -46,24 +45,40 @@ public class DefaultServerPipelineFactor
         SslHandler sslHandler = configureServerSSLOnDemand(consumer);
         if (sslHandler != null) {
             LOG.debug("Server SSL handler configured and added as an 
interceptor against the ChannelPipeline");
-            channelPipeline.addLast("ssl", sslHandler);            
+            addToPipeline("ssl", channelPipeline, sslHandler);
         }
-        List<ChannelDownstreamHandler> encoders = 
consumer.getConfiguration().getEncoders();
+
+        List<ChannelHandler> encoders = 
consumer.getConfiguration().getEncoders();
         for (int x = 0; x < encoders.size(); x++) {
-            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+            ChannelHandler encoder = encoders.get(x);
+            if (encoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as 
it may not be shareable
+                encoder = ((ChannelHandlerFactory) 
encoder).newChannelHandler();
+            }
+            addToPipeline("encoder-" + x, channelPipeline, encoder);
         }
 
-        List<ChannelUpstreamHandler> decoders = 
consumer.getConfiguration().getDecoders();
+        List<ChannelHandler> decoders = 
consumer.getConfiguration().getDecoders();
         for (int x = 0; x < decoders.size(); x++) {
-            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+            ChannelHandler decoder = decoders.get(x);
+            if (decoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as 
it may not be shareable
+                decoder = ((ChannelHandlerFactory) 
decoder).newChannelHandler();
+            }
+            addToPipeline("decoder-" + x, channelPipeline, decoder);
         }
 
         // our handler must be added last
-        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+        addToPipeline("handler", channelPipeline, new 
ServerChannelHandler(consumer));
 
+        LOG.trace("Created ChannelPipeline: {}", channelPipeline);
         return channelPipeline;
     }
-    
+
+    private void addToPipeline(String name, ChannelPipeline pipeline, 
ChannelHandler handler) {
+        pipeline.addLast(name, handler);
+    }
+
     private SslHandler configureServerSSLOnDemand(NettyConsumer consumer) 
throws Exception {
         if (!consumer.getConfiguration().isSsl()) {
             return null;

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
 Wed Jun 13 09:25:16 2012
@@ -48,6 +48,9 @@ public class NettyComponent extends Defa
 
         config.parseURI(new URI(remaining), parameters, this);
 
+        // validate config
+        config.validateConfiguration();
+
         NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, 
config);
         nettyEndpoint.setTimer(getTimer());
         setProperties(nettyEndpoint.getConfiguration(), parameters);

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 Wed Jun 13 09:25:16 2012
@@ -26,16 +26,11 @@ import java.util.Map;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.jsse.SSLContextParameters;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.handler.codec.frame.Delimiters;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
-import org.jboss.netty.handler.codec.string.StringDecoder;
-import org.jboss.netty.handler.codec.string.StringEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.util.CharsetUtil;
 import org.slf4j.Logger;
@@ -62,8 +57,8 @@ public class NettyConfiguration implemen
     private File keyStoreFile;
     private File trustStoreFile;
     private SslHandler sslHandler;
-    private List<ChannelDownstreamHandler> encoders = new 
ArrayList<ChannelDownstreamHandler>();
-    private List<ChannelUpstreamHandler> decoders = new 
ArrayList<ChannelUpstreamHandler>();
+    private List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
+    private List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
     private boolean ssl;
     private long sendBufferSize = 65536;
     private long receiveBufferSize = 65536;
@@ -84,13 +79,14 @@ public class NettyConfiguration implemen
     /**
      * Returns a copy of this configuration
      */
+    @SuppressWarnings("unchecked")
     public NettyConfiguration copy() {
         try {
             NettyConfiguration answer = (NettyConfiguration) clone();
             // make sure the lists is copied in its own instance
-            List<ChannelDownstreamHandler> encodersCopy = new 
ArrayList<ChannelDownstreamHandler>(encoders);
+            List encodersCopy = new ArrayList(encoders);
             answer.setEncoders(encodersCopy);
-            List<ChannelUpstreamHandler> decodersCopy = new 
ArrayList<ChannelUpstreamHandler>(decoders);
+            List decodersCopy = new ArrayList(decoders);
             answer.setDecoders(decodersCopy);
             return answer;
         } catch (CloneNotSupportedException e) {
@@ -98,6 +94,37 @@ public class NettyConfiguration implemen
         }
     }
 
+    public void validateConfiguration() {
+        // validate that the encoders is either shareable or is a handler 
factory
+        for (ChannelHandler encoder : encoders) {
+            if (encoder instanceof ChannelHandlerFactory) {
+                continue;
+            }
+            if (ObjectHelper.getAnnotation(encoder, 
ChannelHandler.Sharable.class) != null) {
+                continue;
+            }
+            LOG.warn("The encoder {} is not @Shareable or an 
ChannelHandlerFactory instance. The encoder cannot safely be used.", encoder);
+        }
+
+        // validate that the decoders is either shareable or is a handler 
factory
+        for (ChannelHandler decoder : decoders) {
+            if (decoder instanceof ChannelHandlerFactory) {
+                continue;
+            }
+            if (ObjectHelper.getAnnotation(decoder, 
ChannelHandler.Sharable.class) != null) {
+                continue;
+            }
+            LOG.warn("The decoder {} is not @Shareable or an 
ChannelHandlerFactory instance. The decoder cannot safely be used.", decoder);
+        }
+        if (sslHandler != null) {
+            boolean factory = sslHandler instanceof ChannelHandlerFactory;
+            boolean shareable = ObjectHelper.getAnnotation(sslHandler, 
ChannelHandler.Sharable.class) != null;
+            if (!factory && !shareable) {
+                LOG.warn("The sslHandler {} is not @Shareable or an 
ChannelHandlerFactory instance. The sslHandler cannot safely be used.", 
sslHandler);
+            }
+        }
+    }
+
     public void parseURI(URI uri, Map<String, Object> parameters, 
NettyComponent component) throws Exception {
         protocol = uri.getScheme();
 
@@ -118,10 +145,10 @@ public class NettyConfiguration implemen
         serverPipelineFactory = 
component.resolveAndRemoveReferenceParameter(parameters, 
"serverPipelineFactory", ServerPipelineFactory.class, null);
 
         // set custom encoders and decoders first
-        List<ChannelDownstreamHandler> referencedEncoders = 
component.resolveAndRemoveReferenceListParameter(parameters, "encoders", 
ChannelDownstreamHandler.class, null);
-        addToHandlersList(encoders, referencedEncoders, 
ChannelDownstreamHandler.class);
-        List<ChannelUpstreamHandler> referencedDecoders = 
component.resolveAndRemoveReferenceListParameter(parameters, "decoders", 
ChannelUpstreamHandler.class, null);
-        addToHandlersList(decoders, referencedDecoders, 
ChannelUpstreamHandler.class);
+        List<ChannelHandler> referencedEncoders = 
component.resolveAndRemoveReferenceListParameter(parameters, "encoders", 
ChannelHandler.class, null);
+        addToHandlersList(encoders, referencedEncoders, ChannelHandler.class);
+        List<ChannelHandler> referencedDecoders = 
component.resolveAndRemoveReferenceListParameter(parameters, "decoders", 
ChannelHandler.class, null);
+        addToHandlersList(decoders, referencedDecoders, ChannelHandler.class);
 
         // then set parameters with the help of the camel context type 
converters
         EndpointHelper.setReferenceProperties(component.getCamelContext(), 
this, parameters);
@@ -133,9 +160,10 @@ public class NettyConfiguration implemen
                 // are we textline or object?
                 if (isTextline()) {
                     Charset charset = getEncoding() != null ? 
Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
-                    encoders.add(new StringEncoder(charset));
-                    decoders.add(new 
DelimiterBasedFrameDecoder(decoderMaxLineLength, true, delimiter == 
TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : 
Delimiters.nulDelimiter()));
-                    decoders.add(new StringDecoder(charset));
+                    
encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
+                    ChannelBuffer[] delimiters = delimiter == 
TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
+                    
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength,
 delimiters));
+                    
decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
 
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Using textline encoders and decoders with 
charset: {}, delimiter: {} and decoderMaxLineLength: {}", 
@@ -143,8 +171,8 @@ public class NettyConfiguration implemen
                     }
                 } else {
                     // object serializable is then used
-                    encoders.add(new ObjectEncoder());
-                    decoders.add(new 
ObjectDecoder(ClassResolvers.weakCachingResolver(null)));
+                    encoders.add(ChannelHandlerFactories.newObjectEncoder());
+                    decoders.add(ChannelHandlerFactories.newObjectDecoder());
 
                     LOG.debug("Using object encoders and decoders");
                 }
@@ -291,42 +319,42 @@ public class NettyConfiguration implemen
         this.sslHandler = sslHandler;
     }
 
-    public List<ChannelDownstreamHandler> getEncoders() {
+    public List<ChannelHandler> getDecoders() {
+        return decoders;
+    }
+
+    public void setDecoders(List<ChannelHandler> decoders) {
+        this.decoders = decoders;
+    }
+
+    public List<ChannelHandler> getEncoders() {
         return encoders;
     }
 
-    public List<ChannelUpstreamHandler> getDecoders() {
-        return decoders;
+    public void setEncoders(List<ChannelHandler> encoders) {
+        this.encoders = encoders;
     }
 
-    public ChannelDownstreamHandler getEncoder() {
+    public ChannelHandler getEncoder() {
         return encoders.isEmpty() ? null : encoders.get(0);
     }
 
-    public void setEncoder(ChannelDownstreamHandler encoder) {
+    public void setEncoder(ChannelHandler encoder) {
         if (!encoders.contains(encoder)) {
             encoders.add(encoder);
         }
     }
 
-    public void setEncoders(List<ChannelDownstreamHandler> encoders) {
-        this.encoders = encoders;
-    }
-
-    public ChannelUpstreamHandler getDecoder() {
+    public ChannelHandler getDecoder() {
         return decoders.isEmpty() ? null : decoders.get(0);
     }
 
-    public void setDecoder(ChannelUpstreamHandler decoder) {
+    public void setDecoder(ChannelHandler decoder) {
         if (!decoders.contains(decoder)) {
             decoders.add(decoder);
         }
     }
 
-    public void setDecoders(List<ChannelUpstreamHandler> decoders) {
-        this.decoders = decoders;
-    }
-
     public long getSendBufferSize() {
         return sendBufferSize;
     }
@@ -451,17 +479,6 @@ public class NettyConfiguration implemen
         return host + ":" + port;
     }
 
-    private <T> void addToHandlersList(List<T> configured, List<T> handlers, 
Class<T> handlerType) {
-        if (handlers != null) {
-            for (int x = 0; x < handlers.size(); x++) {
-                T handler = handlers.get(x);
-                if (handlerType.isInstance(handler)) {
-                    configured.add(handler);
-                }
-            }
-        }
-    }
-
     public void setClientPipelineFactory(ClientPipelineFactory 
clientPipelineFactory) {
         this.clientPipelineFactory = clientPipelineFactory;
     }
@@ -477,7 +494,7 @@ public class NettyConfiguration implemen
     public ServerPipelineFactory getServerPipelineFactory() {
         return serverPipelineFactory;
     }
-    
+
     public int getWorkerCount() {
         return workerCount;
     }
@@ -493,4 +510,15 @@ public class NettyConfiguration implemen
     public void setSslContextParameters(SSLContextParameters 
sslContextParameters) {
         this.sslContextParameters = sslContextParameters;
     }
+
+    private static <T> void addToHandlersList(List<T> configured, List<T> 
handlers, Class<T> handlerType) {
+        if (handlers != null) {
+            for (int x = 0; x < handlers.size(); x++) {
+                T handler = handlers.get(x);
+                if (handlerType.isInstance(handler)) {
+                    configured.add(handler);
+                }
+            }
+        }
+    }
 }

Added: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java?rev=1349704&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java
 Wed Jun 13 09:25:16 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * A {@link ChannelHandlerFactory} returning a shareable {@link 
ChannelHandler}.
+ */
+public class ShareableChannelHandlerFactory implements ChannelHandlerFactory {
+
+    private final ChannelHandler channelHandler;
+
+    public ShareableChannelHandlerFactory(ChannelHandler channelHandler) {
+        this.channelHandler = channelHandler;
+    }
+
+    @Override
+    public ChannelHandler newChannelHandler() {
+        return channelHandler;
+    }
+}

Modified: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
 Wed Jun 13 09:25:16 2012
@@ -24,6 +24,7 @@ import org.apache.camel.builder.RouteBui
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.JndiRegistry;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
@@ -38,7 +39,8 @@ public class MultipleCodecsTest extends 
         JndiRegistry registry = super.createRegistry();
 
         // START SNIPPET: registry-beans
-        LengthFieldBasedFrameDecoder lengthDecoder = new 
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+        ChannelHandlerFactory lengthDecoder = 
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+
         StringDecoder stringDecoder = new StringDecoder();
         registry.bind("length-decoder", lengthDecoder);
         registry.bind("string-decoder", stringDecoder);
@@ -48,11 +50,11 @@ public class MultipleCodecsTest extends 
         registry.bind("length-encoder", lengthEncoder);
         registry.bind("string-encoder", stringEncoder);
 
-        List<ChannelUpstreamHandler> decoders = new 
ArrayList<ChannelUpstreamHandler>();
+        List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
         decoders.add(lengthDecoder);
         decoders.add(stringDecoder);
 
-        List<ChannelDownstreamHandler> encoders = new 
ArrayList<ChannelDownstreamHandler>();
+        List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
         encoders.add(lengthEncoder);
         encoders.add(stringEncoder);
 

Added: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java?rev=1349704&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java
 Wed Jun 13 09:25:16 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class UnsharableCodecsConflicts2Test extends BaseNettyTest {
+
+    static final byte[] LENGTH_HEADER = {0x00, 0x00, 0x40, 0x00}; // 16384 
bytes
+
+    private Processor processor = new P();
+    private int port;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+
+        // create a single decoder
+        ChannelHandlerFactory decoder = 
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+        registry.bind("length-decoder", decoder);
+
+        return registry;
+    }
+
+    @Test
+    public void unsharableCodecsConflictsTest() throws Exception {
+        byte[] data1 = new byte[8192];
+        byte[] data2 = new byte[16383];
+        Arrays.fill(data1, (byte) 0x38);
+        Arrays.fill(data2, (byte) 0x39);
+        byte[] body1 = (new String(LENGTH_HEADER) + new 
String(data1)).getBytes();
+        byte[] body2 = (new String(LENGTH_HEADER) + new 
String(data2)).getBytes();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived(new String(data2) + "9");
+
+        Socket client1 = getSocket("localhost", port);
+        Socket client2 = getSocket("localhost", port);
+
+        // use two clients to send to the same server at the same time
+        try {
+            sendBuffer(body2, client2);
+            sendBuffer(body1, client1);
+            sendBuffer(new String("9").getBytes(), client2);
+        } catch (Exception e) {
+            log.error("", e);
+        } finally {
+            client1.close();
+            client2.close();
+        }
+
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                port = getPort();
+
+                
from("netty:tcp://localhost:{{port}}?decoder=#length-decoder&sync=false")
+                        .process(processor)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    private static Socket getSocket(String host, int port) throws IOException {
+        Socket s = new Socket(host, port);
+        s.setSoTimeout(60000);
+        return s;
+    }
+
+    public static void sendBuffer(byte[] buf, Socket server) throws Exception {
+        OutputStream netOut = server.getOutputStream();
+        OutputStream dataOut = new BufferedOutputStream(netOut);
+        try {
+            dataOut.write(buf, 0, buf.length);
+            dataOut.flush();
+        } catch (Exception e) {
+            server.close();
+            throw e;
+        }
+    }
+
+    class P implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            exchange.getOut().setBody(
+                    new String(((BigEndianHeapChannelBuffer) exchange.getIn()
+                            .getBody()).array()));
+        }
+    }
+}

Modified: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
 Wed Jun 13 09:25:16 2012
@@ -28,7 +28,6 @@ import org.apache.camel.component.mock.M
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.util.IOHelper;
 import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.junit.Test;
 
 /**
@@ -47,12 +46,11 @@ public class UnsharableCodecsConflictsTe
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();
 
-        // the decoders cannot be shared with multiple netty consumers, so we 
need one for each consumer
-        LengthFieldBasedFrameDecoder lengthDecoder = new 
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
-        registry.bind("length-decoder", lengthDecoder);
-
-        LengthFieldBasedFrameDecoder lengthDecoder2 = new 
LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
-        registry.bind("length-decoder2", lengthDecoder2);
+        // we can share the decoder between multiple netty consumers, because 
they have the same configuration
+        // and we use a ChannelHandlerFactory
+        ChannelHandlerFactory decoder = 
ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
+        registry.bind("length-decoder", decoder);
+        registry.bind("length-decoder2", decoder);
 
         return registry;
     }

Modified: 
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml?rev=1349704&r1=1349703&r2=1349704&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
 (original)
+++ 
camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml
 Wed Jun 13 09:25:16 2012
@@ -37,7 +37,7 @@
 
     <!-- START SNIPPET: registry-beans -->
     <util:list id="decoders" list-class="java.util.LinkedList">
-        <bean 
class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder">
+        <bean class="org.apache.camel.component.netty.ChannelHandlerFactories" 
factory-method="newLengthFieldBasedFrameDecoder">
             <constructor-arg value="1048576"/>
             <constructor-arg value="0"/>
             <constructor-arg value="4"/>
@@ -59,7 +59,7 @@
     </bean>
     <bean id="string-encoder" 
class="org.jboss.netty.handler.codec.string.StringEncoder"/>
 
-    <bean id="length-decoder" 
class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder">
+    <bean id="length-decoder" 
class="org.apache.camel.component.netty.ChannelHandlerFactories" 
factory-method="newLengthFieldBasedFrameDecoder">
         <constructor-arg value="1048576"/>
         <constructor-arg value="0"/>
         <constructor-arg value="4"/>


Reply via email to