Author: akarpe Date: Wed Aug 18 18:48:58 2010 New Revision: 986850 URL: http://svn.apache.org/viewvc?rev=986850&view=rev Log: CAMEL-2713 Added a registry based option for a custom ChannelPipelineFactory
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (with props) camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (with props) Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=986850&r1=986849&r2=986850&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Wed Aug 18 18:48:58 2010 @@ -16,95 +16,53 @@ */ package org.apache.camel.component.netty; -import java.util.List; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLEngine; - import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.component.netty.handlers.ClientChannelHandler; -import org.apache.camel.component.netty.ssl.SSLEngineFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -public class ClientPipelineFactory implements ChannelPipelineFactory { - private static final transient Log LOG = LogFactory.getLog(ClientPipelineFactory.class); - private final NettyProducer producer; - private final Exchange exchange; - private final AsyncCallback callback; +public abstract class ClientPipelineFactory implements ChannelPipelineFactory { + protected NettyProducer producer; + protected Exchange exchange; + protected AsyncCallback callback; + public ClientPipelineFactory() { + } + public ClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) { this.producer = producer; this.exchange = exchange; this.callback = callback; } - + public ChannelPipeline getPipeline() throws Exception { - // create a new pipeline ChannelPipeline channelPipeline = Channels.pipeline(); + return channelPipeline; + } - SslHandler sslHandler = configureClientSSLOnDemand(); - if (sslHandler != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Client SSL handler configured and added to the ChannelPipeline"); - } - channelPipeline.addLast("ssl", sslHandler); - } - - // use read timeout handler to handle timeout while waiting for a remote reply (while reading from the remote host) - if (producer.getConfiguration().getTimeout() > 0) { - channelPipeline.addLast("timeout", new ReadTimeoutHandler(producer.getEndpoint().getTimer(), producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS)); - } - - List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders(); - for (int x = 0; x < decoders.size(); x++) { - channelPipeline.addLast("decoder-" + x, decoders.get(x)); - } - - List<ChannelDownstreamHandler> encoders = producer.getConfiguration().getEncoders(); - for (int x = 0; x < encoders.size(); x++) { - channelPipeline.addLast("encoder-" + x, encoders.get(x)); - } + public NettyProducer getProducer() { + return producer; + } - // our handler must be added last - channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + public void setProducer(NettyProducer producer) { + this.producer = producer; + } - return channelPipeline; + public Exchange getExchange() { + return exchange; + } + + public void setExchange(Exchange exchange) { + this.exchange = exchange; } - private SslHandler configureClientSSLOnDemand() throws Exception { - if (!producer.getConfiguration().isSsl()) { - return null; - } - - if (producer.getConfiguration().getSslHandler() != null) { - return producer.getConfiguration().getSslHandler(); - } else { - if (producer.getConfiguration().getKeyStoreFile() == null) { - LOG.debug("keystorefile is null"); - } - if (producer.getConfiguration().getTrustStoreFile() == null) { - LOG.debug("truststorefile is null"); - } - if (producer.getConfiguration().getPassphrase().toCharArray() == null) { - LOG.debug("passphrase is null"); - } - SSLEngineFactory sslEngineFactory = new SSLEngineFactory( - producer.getConfiguration().getKeyStoreFormat(), - producer.getConfiguration().getSecurityProvider(), - producer.getConfiguration().getKeyStoreFile(), - producer.getConfiguration().getTrustStoreFile(), - producer.getConfiguration().getPassphrase().toCharArray()); - SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine(); - return new SslHandler(sslEngine); - } + public AsyncCallback getCallback() { + return callback; + } + + public void setCallback(AsyncCallback callback) { + this.callback = callback; } } Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=986850&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Wed Aug 18 18:48:58 2010 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLEngine; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.component.netty.handlers.ClientChannelHandler; +import org.apache.camel.component.netty.ssl.SSLEngineFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; + +public class DefaultClientPipelineFactory extends ClientPipelineFactory { + private static final transient Log LOG = LogFactory.getLog(ClientPipelineFactory.class); + + public DefaultClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) { + super(producer, exchange, callback); + } + + public ChannelPipeline getPipeline() throws Exception { + // create a new pipeline + ChannelPipeline channelPipeline = Channels.pipeline(); + + SslHandler sslHandler = configureClientSSLOnDemand(); + if (sslHandler != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client SSL handler configured and added to the ChannelPipeline"); + } + channelPipeline.addLast("ssl", sslHandler); + } + + // use read timeout handler to handle timeout while waiting for a remote reply (while reading from the remote host) + if (producer.getConfiguration().getTimeout() > 0) { + channelPipeline.addLast("timeout", new ReadTimeoutHandler(producer.getEndpoint().getTimer(), producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS)); + } + + List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders(); + for (int x = 0; x < decoders.size(); x++) { + channelPipeline.addLast("decoder-" + x, decoders.get(x)); + } + + List<ChannelDownstreamHandler> encoders = producer.getConfiguration().getEncoders(); + for (int x = 0; x < encoders.size(); x++) { + channelPipeline.addLast("encoder-" + x, encoders.get(x)); + } + + // our handler must be added last + channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + + return channelPipeline; + } + + private SslHandler configureClientSSLOnDemand() throws Exception { + if (!producer.getConfiguration().isSsl()) { + return null; + } + + if (producer.getConfiguration().getSslHandler() != null) { + return producer.getConfiguration().getSslHandler(); + } else { + if (producer.getConfiguration().getKeyStoreFile() == null) { + LOG.debug("keystorefile is null"); + } + if (producer.getConfiguration().getTrustStoreFile() == null) { + LOG.debug("truststorefile is null"); + } + if (producer.getConfiguration().getPassphrase().toCharArray() == null) { + LOG.debug("passphrase is null"); + } + SSLEngineFactory sslEngineFactory = new SSLEngineFactory( + producer.getConfiguration().getKeyStoreFormat(), + producer.getConfiguration().getSecurityProvider(), + producer.getConfiguration().getKeyStoreFile(), + producer.getConfiguration().getTrustStoreFile(), + producer.getConfiguration().getPassphrase().toCharArray()); + SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine(); + return new SslHandler(sslEngine); + } + } + +} Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=986850&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java Wed Aug 18 18:48:58 2010 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.List; +import javax.net.ssl.SSLEngine; + +import org.apache.camel.component.netty.handlers.ServerChannelHandler; +import org.apache.camel.component.netty.ssl.SSLEngineFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.ssl.SslHandler; + +public class DefaultServerPipelineFactory implements ChannelPipelineFactory { + private static final transient Log LOG = LogFactory.getLog(DefaultServerPipelineFactory.class); + private NettyConsumer consumer; + + public DefaultServerPipelineFactory(NettyConsumer consumer) { + this.consumer = consumer; + } + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + + SslHandler sslHandler = configureServerSSLOnDemand(); + if (sslHandler != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline"); + } + channelPipeline.addLast("ssl", sslHandler); + } + List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders(); + for (int x = 0; x < encoders.size(); x++) { + channelPipeline.addLast("encoder-" + x, encoders.get(x)); + } + + List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders(); + for (int x = 0; x < decoders.size(); x++) { + channelPipeline.addLast("decoder-" + x, decoders.get(x)); + } + + // our handler must be added last + channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); + + return channelPipeline; + } + + private SslHandler configureServerSSLOnDemand() throws Exception { + if (!consumer.getConfiguration().isSsl()) { + return null; + } + + if (consumer.getConfiguration().getSslHandler() != null) { + return consumer.getConfiguration().getSslHandler(); + } else { + SSLEngineFactory sslEngineFactory = new SSLEngineFactory( + consumer.getConfiguration().getKeyStoreFormat(), + consumer.getConfiguration().getSecurityProvider(), + consumer.getConfiguration().getKeyStoreFile(), + consumer.getConfiguration().getTrustStoreFile(), + consumer.getConfiguration().getPassphrase().toCharArray()); + SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine(); + return new SslHandler(sslEngine); + } + } + +} Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=986850&r1=986849&r2=986850&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Wed Aug 18 18:48:58 2010 @@ -77,7 +77,9 @@ public class NettyConfiguration implemen private boolean disconnectOnNoReply = true; private LoggingLevel noReplyLogLevel = LoggingLevel.WARN; private boolean allowDefaultCodec = true; - + private ClientPipelineFactory clientPipelineFactory; + private ServerPipelineFactory serverPipelineFactory; + /** * Returns a copy of this configuration */ @@ -111,6 +113,8 @@ public class NettyConfiguration implemen securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, "SunX509"); keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null); trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null); + clientPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "clientPipelineFactory", ClientPipelineFactory.class, null); + serverPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "serverPipelineFactory", ServerPipelineFactory.class, null); // set custom encoders and decoders first List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelDownstreamHandler.class, null); @@ -476,4 +480,20 @@ public class NettyConfiguration implemen } } + public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) { + this.clientPipelineFactory = clientPipelineFactory; + } + + public ClientPipelineFactory getClientPipelineFactory() { + return clientPipelineFactory; + } + + public void setServerPipelineFactory(ServerPipelineFactory serverPipelineFactory) { + this.serverPipelineFactory = serverPipelineFactory; + } + + public ServerPipelineFactory getServerPipelineFactory() { + return serverPipelineFactory; + } + } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=986850&r1=986849&r2=986850&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Wed Aug 18 18:48:58 2010 @@ -154,7 +154,12 @@ public class NettyConsumer extends Defau channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); serverBootstrap = new ServerBootstrap(channelFactory); - serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); + if (configuration.getServerPipelineFactory() != null) { + configuration.getServerPipelineFactory().setConsumer(this); + serverBootstrap.setPipelineFactory(configuration.getServerPipelineFactory()); + } else { + serverBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this)); + } serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); @@ -171,7 +176,12 @@ public class NettyConsumer extends Defau datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); - connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); + if (configuration.getServerPipelineFactory() != null) { + configuration.getServerPipelineFactory().setConsumer(this); + connectionlessServerBootstrap.setPipelineFactory(configuration.getServerPipelineFactory()); + } else { + connectionlessServerBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this)); + } connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=986850&r1=986849&r2=986850&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Aug 18 18:48:58 2010 @@ -232,11 +232,20 @@ public class NettyProducer extends Defau private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws Exception { ChannelFuture answer; + ChannelPipeline clientPipeline; - // initialize client pipeline factory - ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(this, exchange, callback); - // must get the pipeline from the factory when opening a new connection - ChannelPipeline clientPipeline = clientPipelineFactory.getPipeline(); + if (configuration.getClientPipelineFactory() != null) { + // initialize user defined client pipeline factory + configuration.getClientPipelineFactory().setProducer(this); + configuration.getClientPipelineFactory().setExchange(exchange); + configuration.getClientPipelineFactory().setCallback(callback); + clientPipeline = configuration.getClientPipelineFactory().getPipeline(); + } else { + // initialize client pipeline factory + ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this, exchange, callback); + // must get the pipeline from the factory when opening a new connection + clientPipeline = clientPipelineFactory.getPipeline(); + } if (isTcp()) { ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=986850&r1=986849&r2=986850&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Wed Aug 18 18:48:58 2010 @@ -16,71 +16,31 @@ */ package org.apache.camel.component.netty; -import java.util.List; -import javax.net.ssl.SSLEngine; - -import org.apache.camel.component.netty.handlers.ServerChannelHandler; -import org.apache.camel.component.netty.ssl.SSLEngineFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.ssl.SslHandler; -public class ServerPipelineFactory implements ChannelPipelineFactory { - private static final transient Log LOG = LogFactory.getLog(ServerPipelineFactory.class); - private NettyConsumer consumer; - +public abstract class ServerPipelineFactory implements ChannelPipelineFactory { + protected NettyConsumer consumer; + + public ServerPipelineFactory() { + } + public ServerPipelineFactory(NettyConsumer consumer) { this.consumer = consumer; } public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); - - SslHandler sslHandler = configureServerSSLOnDemand(); - if (sslHandler != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline"); - } - channelPipeline.addLast("ssl", sslHandler); - } - List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders(); - for (int x = 0; x < encoders.size(); x++) { - channelPipeline.addLast("encoder-" + x, encoders.get(x)); - } - - List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders(); - for (int x = 0; x < decoders.size(); x++) { - channelPipeline.addLast("decoder-" + x, decoders.get(x)); - } - - // our handler must be added last - channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); - return channelPipeline; } - - private SslHandler configureServerSSLOnDemand() throws Exception { - if (!consumer.getConfiguration().isSsl()) { - return null; - } - if (consumer.getConfiguration().getSslHandler() != null) { - return consumer.getConfiguration().getSslHandler(); - } else { - SSLEngineFactory sslEngineFactory = new SSLEngineFactory( - consumer.getConfiguration().getKeyStoreFormat(), - consumer.getConfiguration().getSecurityProvider(), - consumer.getConfiguration().getKeyStoreFile(), - consumer.getConfiguration().getTrustStoreFile(), - consumer.getConfiguration().getPassphrase().toCharArray()); - SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine(); - return new SslHandler(sslEngine); - } - } + public NettyConsumer getConsumer() { + return consumer; + } + public void setConsumer(NettyConsumer consumer) { + this.consumer = consumer; + } + } Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=986850&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java Wed Aug 18 18:48:58 2010 @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.netty.handlers.ClientChannelHandler; +import org.apache.camel.component.netty.handlers.ServerChannelHandler; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.CamelTestSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Test; + +public class NettyCustomPipelineFactoryAsynchTest extends CamelTestSupport { + private static final transient Log LOG = LogFactory.getLog(NettyCustomPipelineFactoryAsynchTest.class); + + @Produce(uri = "direct:start") + protected ProducerTemplate producerTemplate; + private TestClientChannelPipelineFactory clientPipelineFactory; + private TestServerChannelPipelineFactory serverPipelineFactory; + private String response; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = new JndiRegistry(createJndiContext()); + clientPipelineFactory = new TestClientChannelPipelineFactory(); + serverPipelineFactory = new TestServerChannelPipelineFactory(); + registry.bind("cpf", clientPipelineFactory); + registry.bind("spf", serverPipelineFactory); + return registry; + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private void sendRequest() throws Exception { + // Async request + response = (String) producerTemplate.requestBody( + "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", + "Forest Gump describing Vietnam..."); + } + + @Test + public void testCustomClientPipelineFactory() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'"); + } + }); + } + }); + context.start(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Beginning Test ---> testCustomClientPipelineFactory()"); + } + sendRequest(); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed Test ---> testCustomClientPipelineFactory()"); + } + context.stop(); + + assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response); + assertEquals(true, clientPipelineFactory.isfactoryInvoked()); + assertEquals(true, serverPipelineFactory.isfactoryInvoked()); + } + + public class TestClientChannelPipelineFactory extends ClientPipelineFactory { + private int maxLineSize = 1024; + private boolean invoked; + + public ChannelPipeline getPipeline() throws Exception { + invoked = true; + + ChannelPipeline channelPipeline = Channels.pipeline(); + + channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); + channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + + return channelPipeline; + + } + + public boolean isfactoryInvoked() { + return invoked; + } + } + + public class TestServerChannelPipelineFactory extends ServerPipelineFactory { + private int maxLineSize = 1024; + private boolean invoked; + + public ChannelPipeline getPipeline() throws Exception { + invoked = true; + + ChannelPipeline channelPipeline = Channels.pipeline(); + + channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); + channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); + + return channelPipeline; + } + + public boolean isfactoryInvoked() { + return invoked; + } + + } +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=986850&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Wed Aug 18 18:48:58 2010 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.netty.handlers.ClientChannelHandler; +import org.apache.camel.component.netty.handlers.ServerChannelHandler; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.CamelTestSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Test; + +public class NettyCustomPipelineFactorySynchTest extends CamelTestSupport { + private static final transient Log LOG = LogFactory.getLog(NettyCustomPipelineFactorySynchTest.class); + + @Produce(uri = "direct:start") + protected ProducerTemplate producerTemplate; + private TestClientChannelPipelineFactory clientPipelineFactory; + private TestServerChannelPipelineFactory serverPipelineFactory; + private String response; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = new JndiRegistry(createJndiContext()); + clientPipelineFactory = new TestClientChannelPipelineFactory(); + serverPipelineFactory = new TestServerChannelPipelineFactory(); + registry.bind("cpf", clientPipelineFactory); + registry.bind("spf", serverPipelineFactory); + return registry; + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private void sendRequest() throws Exception { + // Async request + response = (String) producerTemplate.requestBody( + "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&sync=true&textline=true", + "Forest Gump describing Vietnam..."); + } + + @Test + public void testCustomClientPipelineFactory() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&sync=true&textline=true") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'"); + } + }); + } + }); + context.start(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Beginning Test ---> testCustomClientPipelineFactory()"); + } + sendRequest(); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed Test ---> testCustomClientPipelineFactory()"); + } + context.stop(); + + assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response); + assertEquals(true, clientPipelineFactory.isfactoryInvoked()); + assertEquals(true, serverPipelineFactory.isfactoryInvoked()); + } + + public class TestClientChannelPipelineFactory extends ClientPipelineFactory { + private int maxLineSize = 1024; + private boolean invoked; + + public ChannelPipeline getPipeline() throws Exception { + invoked = true; + + ChannelPipeline channelPipeline = Channels.pipeline(); + + // In Sync mode,adding a read timeout handler to handle timeout while waiting for a remote reply + if (producer.getConfiguration().getTimeout() > 0) { + channelPipeline.addLast("timeout", new ReadTimeoutHandler(producer.getEndpoint().getTimer(), producer.getConfiguration().getTimeout(), TimeUnit.MILLISECONDS)); + } + + channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); + channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + + return channelPipeline; + + } + + public boolean isfactoryInvoked() { + return invoked; + } + } + + public class TestServerChannelPipelineFactory extends ServerPipelineFactory { + private int maxLineSize = 1024; + private boolean invoked; + + public ChannelPipeline getPipeline() throws Exception { + invoked = true; + + ChannelPipeline channelPipeline = Channels.pipeline(); + + channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); + channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); + + return channelPipeline; + } + + public boolean isfactoryInvoked() { + return invoked; + } + + } +} + Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java ------------------------------------------------------------------------------ svn:eol-style = native