Author: akarpe
Date: Wed Aug 18 18:48:58 2010
New Revision: 986850

URL: http://svn.apache.org/viewvc?rev=986850&view=rev
Log:
CAMEL-2713 Added a registry based option for a custom ChannelPipelineFactory

Added:
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
   (with props)
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
   (with props)
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
   (with props)
    
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
   (with props)
Modified:
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=986850&r1=986849&r2=986850&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
 Wed Aug 18 18:48:58 2010
@@ -16,95 +16,53 @@
  */
 package org.apache.camel.component.netty;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLEngine;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.netty.handlers.ClientChannelHandler;
-import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 
-public class ClientPipelineFactory implements ChannelPipelineFactory {
-    private static final transient Log LOG = 
LogFactory.getLog(ClientPipelineFactory.class);
-    private final NettyProducer producer;
-    private final Exchange exchange;
-    private final AsyncCallback callback;
+public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
+    protected NettyProducer producer;
+    protected Exchange exchange;
+    protected AsyncCallback callback;
 
+    public ClientPipelineFactory() {
+    }
+    
     public ClientPipelineFactory(NettyProducer producer, Exchange exchange, 
AsyncCallback callback) {
         this.producer = producer;
         this.exchange = exchange;
         this.callback = callback;
     }
-
+    
     public ChannelPipeline getPipeline() throws Exception {
-        // create a new pipeline
         ChannelPipeline channelPipeline = Channels.pipeline();
+        return channelPipeline;
+    }
 
-        SslHandler sslHandler = configureClientSSLOnDemand();
-        if (sslHandler != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Client SSL handler configured and added to the 
ChannelPipeline");
-            }
-            channelPipeline.addLast("ssl", sslHandler);
-        }
-
-        // use read timeout handler to handle timeout while waiting for a 
remote reply (while reading from the remote host)
-        if (producer.getConfiguration().getTimeout() > 0) {
-            channelPipeline.addLast("timeout", new 
ReadTimeoutHandler(producer.getEndpoint().getTimer(), 
producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS));
-        }
-
-        List<ChannelUpstreamHandler> decoders = 
producer.getConfiguration().getDecoders();
-        for (int x = 0; x < decoders.size(); x++) {
-            channelPipeline.addLast("decoder-" + x, decoders.get(x));
-        }
-
-        List<ChannelDownstreamHandler> encoders = 
producer.getConfiguration().getEncoders();
-        for (int x = 0; x < encoders.size(); x++) {
-            channelPipeline.addLast("encoder-" + x, encoders.get(x));
-        }
+    public NettyProducer getProducer() {
+        return producer;
+    }
 
-        // our handler must be added last
-        channelPipeline.addLast("handler", new ClientChannelHandler(producer, 
exchange, callback));
+    public void setProducer(NettyProducer producer) {
+        this.producer = producer;
+    }
 
-        return channelPipeline;
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    public void setExchange(Exchange exchange) {
+        this.exchange = exchange;
     }
 
-    private SslHandler configureClientSSLOnDemand() throws Exception {
-        if (!producer.getConfiguration().isSsl()) {
-            return null;
-        }
-
-        if (producer.getConfiguration().getSslHandler() != null) {
-            return producer.getConfiguration().getSslHandler();
-        } else {
-            if (producer.getConfiguration().getKeyStoreFile() == null) {
-                LOG.debug("keystorefile is null");
-            }
-            if (producer.getConfiguration().getTrustStoreFile() == null) {
-                LOG.debug("truststorefile is null");
-            }
-            if (producer.getConfiguration().getPassphrase().toCharArray() == 
null) {
-                LOG.debug("passphrase is null");
-            }
-            SSLEngineFactory sslEngineFactory = new SSLEngineFactory(
-                producer.getConfiguration().getKeyStoreFormat(),
-                producer.getConfiguration().getSecurityProvider(),
-                producer.getConfiguration().getKeyStoreFile(),
-                producer.getConfiguration().getTrustStoreFile(),
-                producer.getConfiguration().getPassphrase().toCharArray());
-            SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine();
-            return new SslHandler(sslEngine);
-        }
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    public void setCallback(AsyncCallback callback) {
+        this.callback = callback;
     }
 
 }

Added: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=986850&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
 Wed Aug 18 18:48:58 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+
+public class DefaultClientPipelineFactory extends ClientPipelineFactory {
+    private static final transient Log LOG = 
LogFactory.getLog(ClientPipelineFactory.class);
+
+    public DefaultClientPipelineFactory(NettyProducer producer, Exchange 
exchange, AsyncCallback callback) {
+        super(producer, exchange, callback);
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        // create a new pipeline
+        ChannelPipeline channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureClientSSLOnDemand();
+        if (sslHandler != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Client SSL handler configured and added to the 
ChannelPipeline");
+            }
+            channelPipeline.addLast("ssl", sslHandler);
+        }
+
+        // use read timeout handler to handle timeout while waiting for a 
remote reply (while reading from the remote host)
+        if (producer.getConfiguration().getTimeout() > 0) {
+            channelPipeline.addLast("timeout", new 
ReadTimeoutHandler(producer.getEndpoint().getTimer(), 
producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS));
+        }
+
+        List<ChannelUpstreamHandler> decoders = 
producer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+        }
+
+        List<ChannelDownstreamHandler> encoders = 
producer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+        }
+
+        // our handler must be added last
+        channelPipeline.addLast("handler", new ClientChannelHandler(producer, 
exchange, callback));
+
+        return channelPipeline;
+    }
+
+    private SslHandler configureClientSSLOnDemand() throws Exception {
+        if (!producer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (producer.getConfiguration().getSslHandler() != null) {
+            return producer.getConfiguration().getSslHandler();
+        } else {
+            if (producer.getConfiguration().getKeyStoreFile() == null) {
+                LOG.debug("keystorefile is null");
+            }
+            if (producer.getConfiguration().getTrustStoreFile() == null) {
+                LOG.debug("truststorefile is null");
+            }
+            if (producer.getConfiguration().getPassphrase().toCharArray() == 
null) {
+                LOG.debug("passphrase is null");
+            }
+            SSLEngineFactory sslEngineFactory = new SSLEngineFactory(
+                producer.getConfiguration().getKeyStoreFormat(),
+                producer.getConfiguration().getSecurityProvider(),
+                producer.getConfiguration().getKeyStoreFile(),
+                producer.getConfiguration().getTrustStoreFile(),
+                producer.getConfiguration().getPassphrase().toCharArray());
+            SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine();
+            return new SslHandler(sslEngine);
+        }
+    }
+
+}

Propchange: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=986850&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
 Wed Aug 18 18:48:58 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.List;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty.handlers.ServerChannelHandler;
+import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class DefaultServerPipelineFactory implements ChannelPipelineFactory {
+    private static final transient Log LOG = 
LogFactory.getLog(DefaultServerPipelineFactory.class);
+    private NettyConsumer consumer;
+        
+    public DefaultServerPipelineFactory(NettyConsumer consumer) {
+        this.consumer = consumer; 
+    }    
+
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureServerSSLOnDemand();
+        if (sslHandler != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Server SSL handler configured and added as an 
interceptor against the ChannelPipeline");
+            }
+            channelPipeline.addLast("ssl", sslHandler);            
+        }
+        List<ChannelDownstreamHandler> encoders = 
consumer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+        }
+
+        List<ChannelUpstreamHandler> decoders = 
consumer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+        }
+
+        // our handler must be added last
+        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+
+        return channelPipeline;
+    }
+    
+    private SslHandler configureServerSSLOnDemand() throws Exception {
+        if (!consumer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (consumer.getConfiguration().getSslHandler() != null) {
+            return consumer.getConfiguration().getSslHandler();
+        } else {
+            SSLEngineFactory sslEngineFactory = new SSLEngineFactory(
+                consumer.getConfiguration().getKeyStoreFormat(),
+                consumer.getConfiguration().getSecurityProvider(),
+                consumer.getConfiguration().getKeyStoreFile(), 
+                consumer.getConfiguration().getTrustStoreFile(), 
+                consumer.getConfiguration().getPassphrase().toCharArray());
+            SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
+            return new SslHandler(sslEngine);
+        }
+    }   
+
+}

Propchange: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=986850&r1=986849&r2=986850&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 Wed Aug 18 18:48:58 2010
@@ -77,7 +77,9 @@ public class NettyConfiguration implemen
     private boolean disconnectOnNoReply = true;
     private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
     private boolean allowDefaultCodec = true;
-
+    private ClientPipelineFactory clientPipelineFactory;
+    private ServerPipelineFactory serverPipelineFactory;
+    
     /**
      * Returns a copy of this configuration
      */
@@ -111,6 +113,8 @@ public class NettyConfiguration implemen
         securityProvider = component.getAndRemoveParameter(parameters, 
"securityProvider", String.class, "SunX509");
         keyStoreFile = 
component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", 
File.class, null);
         trustStoreFile = 
component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", 
File.class, null);
+        clientPipelineFactory = 
component.resolveAndRemoveReferenceParameter(parameters, 
"clientPipelineFactory", ClientPipelineFactory.class, null);
+        serverPipelineFactory = 
component.resolveAndRemoveReferenceParameter(parameters, 
"serverPipelineFactory", ServerPipelineFactory.class, null);
 
         // set custom encoders and decoders first
         List<ChannelDownstreamHandler> referencedEncoders = 
component.resolveAndRemoveReferenceListParameter(parameters, "encoders", 
ChannelDownstreamHandler.class, null);
@@ -476,4 +480,20 @@ public class NettyConfiguration implemen
         }
     }
 
+    public void setClientPipelineFactory(ClientPipelineFactory 
clientPipelineFactory) {
+        this.clientPipelineFactory = clientPipelineFactory;
+    }
+
+    public ClientPipelineFactory getClientPipelineFactory() {
+        return clientPipelineFactory;
+    }
+
+    public void setServerPipelineFactory(ServerPipelineFactory 
serverPipelineFactory) {
+        this.serverPipelineFactory = serverPipelineFactory;
+    }
+
+    public ServerPipelineFactory getServerPipelineFactory() {
+        return serverPipelineFactory;
+    }
+
 }

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=986850&r1=986849&r2=986850&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
 Wed Aug 18 18:48:58 2010
@@ -154,7 +154,12 @@ public class NettyConsumer extends Defau
 
         channelFactory = new NioServerSocketChannelFactory(bossExecutor, 
workerExecutor);
         serverBootstrap = new ServerBootstrap(channelFactory);
-        serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+        if (configuration.getServerPipelineFactory() != null) {
+            configuration.getServerPipelineFactory().setConsumer(this);
+            
serverBootstrap.setPipelineFactory(configuration.getServerPipelineFactory());
+        } else {
+            serverBootstrap.setPipelineFactory(new 
DefaultServerPipelineFactory(this));
+        }
         serverBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());
         serverBootstrap.setOption("child.reuseAddress", 
configuration.isReuseAddress());
@@ -171,7 +176,12 @@ public class NettyConsumer extends Defau
 
         datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
         connectionlessServerBootstrap = new 
ConnectionlessBootstrap(datagramChannelFactory);
-        connectionlessServerBootstrap.setPipelineFactory(new 
ServerPipelineFactory(this));
+        if (configuration.getServerPipelineFactory() != null) {
+            configuration.getServerPipelineFactory().setConsumer(this);
+            
connectionlessServerBootstrap.setPipelineFactory(configuration.getServerPipelineFactory());
+        } else {
+            connectionlessServerBootstrap.setPipelineFactory(new 
DefaultServerPipelineFactory(this));
+        }
         connectionlessServerBootstrap.setOption("child.keepAlive", 
configuration.isKeepAlive());
         connectionlessServerBootstrap.setOption("child.tcpNoDelay", 
configuration.isTcpNoDelay());
         connectionlessServerBootstrap.setOption("child.reuseAddress", 
configuration.isReuseAddress());

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=986850&r1=986849&r2=986850&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 Wed Aug 18 18:48:58 2010
@@ -232,11 +232,20 @@ public class NettyProducer extends Defau
 
     private ChannelFuture openConnection(Exchange exchange, AsyncCallback 
callback) throws Exception {
         ChannelFuture answer;
+        ChannelPipeline clientPipeline;
 
-        // initialize client pipeline factory
-        ClientPipelineFactory clientPipelineFactory = new 
ClientPipelineFactory(this, exchange, callback);
-        // must get the pipeline from the factory when opening a new connection
-        ChannelPipeline clientPipeline = clientPipelineFactory.getPipeline();
+        if (configuration.getClientPipelineFactory() != null) {
+            // initialize user defined client pipeline factory
+            configuration.getClientPipelineFactory().setProducer(this);
+            configuration.getClientPipelineFactory().setExchange(exchange);
+            configuration.getClientPipelineFactory().setCallback(callback);
+            clientPipeline = 
configuration.getClientPipelineFactory().getPipeline();
+        } else {
+            // initialize client pipeline factory
+            ClientPipelineFactory clientPipelineFactory = new 
DefaultClientPipelineFactory(this, exchange, callback);
+            // must get the pipeline from the factory when opening a new 
connection
+            clientPipeline = clientPipelineFactory.getPipeline();
+        }
 
         if (isTcp()) {
             ClientBootstrap clientBootstrap = new 
ClientBootstrap(channelFactory);

Modified: 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=986850&r1=986849&r2=986850&view=diff
==============================================================================
--- 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
 (original)
+++ 
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
 Wed Aug 18 18:48:58 2010
@@ -16,71 +16,31 @@
  */
 package org.apache.camel.component.netty;
 
-import java.util.List;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.camel.component.netty.handlers.ServerChannelHandler;
-import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.ssl.SslHandler;
 
-public class ServerPipelineFactory implements ChannelPipelineFactory {
-    private static final transient Log LOG = 
LogFactory.getLog(ServerPipelineFactory.class);
-    private NettyConsumer consumer;
-        
+public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
+    protected NettyConsumer consumer;
+      
+    public ServerPipelineFactory() {
+    }
+    
     public ServerPipelineFactory(NettyConsumer consumer) {
         this.consumer = consumer; 
     }    
 
     public ChannelPipeline getPipeline() throws Exception {
         ChannelPipeline channelPipeline = Channels.pipeline();
-
-        SslHandler sslHandler = configureServerSSLOnDemand();
-        if (sslHandler != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Server SSL handler configured and added as an 
interceptor against the ChannelPipeline");
-            }
-            channelPipeline.addLast("ssl", sslHandler);            
-        }
-        List<ChannelDownstreamHandler> encoders = 
consumer.getConfiguration().getEncoders();
-        for (int x = 0; x < encoders.size(); x++) {
-            channelPipeline.addLast("encoder-" + x, encoders.get(x));
-        }
-
-        List<ChannelUpstreamHandler> decoders = 
consumer.getConfiguration().getDecoders();
-        for (int x = 0; x < decoders.size(); x++) {
-            channelPipeline.addLast("decoder-" + x, decoders.get(x));
-        }
-
-        // our handler must be added last
-        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
-
         return channelPipeline;
     }
-    
-    private SslHandler configureServerSSLOnDemand() throws Exception {
-        if (!consumer.getConfiguration().isSsl()) {
-            return null;
-        }
 
-        if (consumer.getConfiguration().getSslHandler() != null) {
-            return consumer.getConfiguration().getSslHandler();
-        } else {
-            SSLEngineFactory sslEngineFactory = new SSLEngineFactory(
-                consumer.getConfiguration().getKeyStoreFormat(),
-                consumer.getConfiguration().getSecurityProvider(),
-                consumer.getConfiguration().getKeyStoreFile(), 
-                consumer.getConfiguration().getTrustStoreFile(), 
-                consumer.getConfiguration().getPassphrase().toCharArray());
-            SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
-            return new SslHandler(sslEngine);
-        }
-    }   
+    public NettyConsumer getConsumer() {
+        return consumer;
+    }
 
+    public void setConsumer(NettyConsumer consumer) {
+        this.consumer = consumer;
+    }
+    
 }

Added: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=986850&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
 Wed Aug 18 18:48:58 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty.handlers.ServerChannelHandler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.CamelTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.CharsetUtil;
+import org.junit.Test;
+
+public class NettyCustomPipelineFactoryAsynchTest extends CamelTestSupport {
+    private static final transient Log LOG = 
LogFactory.getLog(NettyCustomPipelineFactoryAsynchTest.class);
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate producerTemplate;
+    private TestClientChannelPipelineFactory clientPipelineFactory;
+    private TestServerChannelPipelineFactory serverPipelineFactory;
+    private String response;
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = new JndiRegistry(createJndiContext());
+        clientPipelineFactory = new TestClientChannelPipelineFactory();
+        serverPipelineFactory = new TestServerChannelPipelineFactory();
+        registry.bind("cpf", clientPipelineFactory);
+        registry.bind("spf", serverPipelineFactory);
+        return registry;
+    }
+    
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private void sendRequest() throws Exception {
+        // Async request
+        response = (String) producerTemplate.requestBody(
+            
"netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", 
+            "Forest Gump describing Vietnam...");        
+    }
+    
+    @Test
+    public void testCustomClientPipelineFactory() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                
from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            exchange.getOut().setBody("Forrest Gump: We was 
always taking long walks, and we was always looking for a guy named 
'Charlie'");                           
+                        }
+                    });                
+            }
+        });
+        context.start();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Beginning Test ---> testCustomClientPipelineFactory()");
+        }        
+        sendRequest();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Completed Test ---> testCustomClientPipelineFactory()");
+        }
+        context.stop();
+        
+        assertEquals("Forrest Gump: We was always taking long walks, and we 
was always looking for a guy named 'Charlie'", response);
+        assertEquals(true, clientPipelineFactory.isfactoryInvoked());
+        assertEquals(true, serverPipelineFactory.isfactoryInvoked());
+    } 
+    
+    public class TestClientChannelPipelineFactory extends 
ClientPipelineFactory {
+        private int maxLineSize = 1024;
+        private boolean invoked;
+        
+        public ChannelPipeline getPipeline() throws Exception {
+            invoked = true;
+            
+            ChannelPipeline channelPipeline = Channels.pipeline();
+
+            channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
+            channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));            
+            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer, exchange, callback));
+
+            return channelPipeline;
+
+        }
+        
+        public boolean isfactoryInvoked() {
+            return invoked;
+        }
+    }
+    
+    public class TestServerChannelPipelineFactory extends 
ServerPipelineFactory {
+        private int maxLineSize = 1024;
+        private boolean invoked;
+        
+        public ChannelPipeline getPipeline() throws Exception {
+            invoked = true;
+            
+            ChannelPipeline channelPipeline = Channels.pipeline();
+
+            channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
+            channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("handler", new 
ServerChannelHandler(consumer));
+
+            return channelPipeline;
+        }
+        
+        public boolean isfactoryInvoked() {
+            return invoked;
+        }
+        
+    }
+}

Propchange: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=986850&view=auto
==============================================================================
--- 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
 (added)
+++ 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
 Wed Aug 18 18:48:58 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty.handlers.ServerChannelHandler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.CamelTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.CharsetUtil;
+import org.junit.Test;
+
+public class NettyCustomPipelineFactorySynchTest extends CamelTestSupport {
+    private static final transient Log LOG = 
LogFactory.getLog(NettyCustomPipelineFactorySynchTest.class);
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate producerTemplate;
+    private TestClientChannelPipelineFactory clientPipelineFactory;
+    private TestServerChannelPipelineFactory serverPipelineFactory;
+    private String response;
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = new JndiRegistry(createJndiContext());
+        clientPipelineFactory = new TestClientChannelPipelineFactory();
+        serverPipelineFactory = new TestServerChannelPipelineFactory();
+        registry.bind("cpf", clientPipelineFactory);
+        registry.bind("spf", serverPipelineFactory);
+        return registry;
+    }
+    
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private void sendRequest() throws Exception {
+        // Async request
+        response = (String) producerTemplate.requestBody(
+            
"netty:tcp://localhost:5110?clientPipelineFactory=#cpf&sync=true&textline=true",
 
+            "Forest Gump describing Vietnam...");        
+    }
+    
+    @Test
+    public void testCustomClientPipelineFactory() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                
from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&sync=true&textline=true")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            exchange.getOut().setBody("Forrest Gump: We was 
always taking long walks, and we was always looking for a guy named 
'Charlie'");                           
+                        }
+                    });                
+            }
+        });
+        context.start();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Beginning Test ---> testCustomClientPipelineFactory()");
+        }        
+        sendRequest();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Completed Test ---> testCustomClientPipelineFactory()");
+        }
+        context.stop();
+        
+        assertEquals("Forrest Gump: We was always taking long walks, and we 
was always looking for a guy named 'Charlie'", response);
+        assertEquals(true, clientPipelineFactory.isfactoryInvoked());
+        assertEquals(true, serverPipelineFactory.isfactoryInvoked());
+    } 
+    
+    public class TestClientChannelPipelineFactory extends 
ClientPipelineFactory {
+        private int maxLineSize = 1024;
+        private boolean invoked;
+        
+        public ChannelPipeline getPipeline() throws Exception {
+            invoked = true;
+            
+            ChannelPipeline channelPipeline = Channels.pipeline();
+
+            // In Sync mode,adding a read timeout handler to handle timeout 
while waiting for a remote reply
+            if (producer.getConfiguration().getTimeout() > 0) {
+                channelPipeline.addLast("timeout", new 
ReadTimeoutHandler(producer.getEndpoint().getTimer(), 
producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS));
+            }
+
+            channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
+            channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));            
+            channelPipeline.addLast("handler", new 
ClientChannelHandler(producer, exchange, callback));
+
+            return channelPipeline;
+
+        }
+        
+        public boolean isfactoryInvoked() {
+            return invoked;
+        }
+    }
+    
+    public class TestServerChannelPipelineFactory extends 
ServerPipelineFactory {
+        private int maxLineSize = 1024;
+        private boolean invoked;
+        
+        public ChannelPipeline getPipeline() throws Exception {
+            invoked = true;
+            
+            ChannelPipeline channelPipeline = Channels.pipeline();
+
+            channelPipeline.addLast("encoder-SD", new 
StringEncoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("decoder-DELIM", new 
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
+            channelPipeline.addLast("decoder-SD", new 
StringDecoder(CharsetUtil.UTF_8));
+            channelPipeline.addLast("handler", new 
ServerChannelHandler(consumer));
+
+            return channelPipeline;
+        }
+        
+        public boolean isfactoryInvoked() {
+            return invoked;
+        }
+        
+    }
+}
+

Propchange: 
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to