Author: davsclaus Date: Sun Apr 8 13:41:45 2012 New Revision: 1310999 URL: http://svn.apache.org/viewvc?rev=1310999&view=rev Log: CAMEL-4960: Pipeline factories is now stateless and thread-safe.
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Sun Apr 8 13:41:45 2012 @@ -18,28 +18,28 @@ package org.apache.camel.component.netty import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +/** + * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyProducer}. + * <p/> + * Implementators should use implement the {@link #getPipeline(NettyProducer)} method. + * + * @see ChannelPipelineFactory + */ public abstract class ClientPipelineFactory implements ChannelPipelineFactory { - protected NettyProducer producer; public ClientPipelineFactory() { } - public ClientPipelineFactory(NettyProducer producer) { - this.producer = producer; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline channelPipeline = Channels.pipeline(); - return channelPipeline; - } + /** + * Returns a newly created {@link ChannelPipeline}. + * + * @param producer the netty producer + */ + public abstract ChannelPipeline getPipeline(NettyProducer producer) throws Exception; - public NettyProducer getProducer() { - return producer; - } - - public void setProducer(NettyProducer producer) { - this.producer = producer; + @Override + public ChannelPipeline getPipeline() throws Exception { + throw new UnsupportedOperationException("use getPipeline(NettyProducer) instead"); } } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Sun Apr 8 13:41:45 2012 @@ -33,15 +33,12 @@ import org.slf4j.LoggerFactory; public class DefaultClientPipelineFactory extends ClientPipelineFactory { private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class); - public DefaultClientPipelineFactory(NettyProducer producer) { - super(producer); - } - - public ChannelPipeline getPipeline() throws Exception { + @Override + public ChannelPipeline getPipeline(NettyProducer producer) throws Exception { // create a new pipeline ChannelPipeline channelPipeline = Channels.pipeline(); - SslHandler sslHandler = configureClientSSLOnDemand(); + SslHandler sslHandler = configureClientSSLOnDemand(producer); if (sslHandler != null) { LOG.debug("Client SSL handler configured and added to the ChannelPipeline"); channelPipeline.addLast("ssl", sslHandler); @@ -63,7 +60,7 @@ public class DefaultClientPipelineFactor return channelPipeline; } - private SslHandler configureClientSSLOnDemand() throws Exception { + private SslHandler configureClientSSLOnDemand(NettyProducer producer) throws Exception { if (!producer.getConfiguration().isSsl()) { return null; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java Sun Apr 8 13:41:45 2012 @@ -17,7 +17,6 @@ package org.apache.camel.component.netty; import java.util.List; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -25,25 +24,20 @@ import org.apache.camel.component.netty. import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultServerPipelineFactory implements ChannelPipelineFactory { +public class DefaultServerPipelineFactory extends ServerPipelineFactory { private static final transient Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class); - private NettyConsumer consumer; - - public DefaultServerPipelineFactory(NettyConsumer consumer) { - this.consumer = consumer; - } - public ChannelPipeline getPipeline() throws Exception { + @Override + public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); - SslHandler sslHandler = configureServerSSLOnDemand(); + SslHandler sslHandler = configureServerSSLOnDemand(consumer); if (sslHandler != null) { LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline"); channelPipeline.addLast("ssl", sslHandler); @@ -64,7 +58,7 @@ public class DefaultServerPipelineFactor return channelPipeline; } - private SslHandler configureServerSSLOnDemand() throws Exception { + private SslHandler configureServerSSLOnDemand(NettyConsumer consumer) throws Exception { if (!consumer.getConfiguration().isSsl()) { return null; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java Sun Apr 8 13:41:45 2012 @@ -20,7 +20,11 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; /** - * + * Stores state for {@link NettyProducer} when sending messages. + * <p/> + * This allows the {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} to access + * this state, which is needed so we can get hold of the current {@link Exchange} and the + * {@link AsyncCallback} so we can continue routing the message in the Camel routing engine. */ public final class NettyCamelState { Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Sun Apr 8 13:41:45 2012 @@ -26,6 +26,7 @@ import org.jboss.netty.bootstrap.Connect import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; @@ -45,6 +46,7 @@ public class NettyConsumer extends Defau private DatagramChannelFactory datagramChannelFactory; private ServerBootstrap serverBootstrap; private ConnectionlessBootstrap connectionlessServerBootstrap; + private ServerPipelineFactory pipelineFactory; private Channel channel; private ExecutorService bossExecutor; private ExecutorService workerExecutor; @@ -63,9 +65,16 @@ public class NettyConsumer extends Defau @Override protected void doStart() throws Exception { + super.doStart(); + LOG.debug("Netty consumer binding to: {}", configuration.getAddress()); - super.doStart(); + // setup pipeline factory + pipelineFactory = configuration.getServerPipelineFactory(); + if (pipelineFactory == null) { + pipelineFactory = new DefaultServerPipelineFactory(); + } + if (isTcp()) { initializeTCPServerSocketCommunicationLayer(); } else { @@ -97,9 +106,9 @@ public class NettyConsumer extends Defau context.getExecutorServiceManager().shutdownNow(workerExecutor); } - super.doStop(); - LOG.info("Netty consumer unbound from: " + configuration.getAddress()); + + super.doStop(); } public CamelContext getContext() { @@ -165,18 +174,16 @@ public class NettyConsumer extends Defau configuration.getWorkerCount()); } serverBootstrap = new ServerBootstrap(channelFactory); - if (configuration.getServerPipelineFactory() != null) { - configuration.getServerPipelineFactory().setConsumer(this); - serverBootstrap.setPipelineFactory(configuration.getServerPipelineFactory()); - } else { - serverBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this)); - } serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); + // must get the pipeline from the factory when opening a new connection + ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this); + serverBootstrap.setPipeline(serverPipeline); + channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); // to keep track of all channels in use allChannels.add(channel); @@ -190,12 +197,6 @@ public class NettyConsumer extends Defau datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); } connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); - if (configuration.getServerPipelineFactory() != null) { - configuration.getServerPipelineFactory().setConsumer(this); - connectionlessServerBootstrap.setPipelineFactory(configuration.getServerPipelineFactory()); - } else { - connectionlessServerBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this)); - } connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); connectionlessServerBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); @@ -210,6 +211,10 @@ public class NettyConsumer extends Defau new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor())); } + // must get the pipeline from the factory when opening a new connection + ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this); + connectionlessServerBootstrap.setPipeline(serverPipeline); + channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); // to keep track of all channels in use allChannels.add(channel); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sun Apr 8 13:41:45 2012 @@ -54,6 +54,7 @@ public class NettyProducer extends Defau private NettyConfiguration configuration; private ChannelFactory channelFactory; private DatagramChannelFactory datagramChannelFactory; + private ClientPipelineFactory pipelineFactory; private CamelLogger noReplyLogger; private ExecutorService bossExecutor; private ExecutorService workerExecutor; @@ -92,6 +93,12 @@ public class NettyProducer extends Defau protected void doStart() throws Exception { super.doStart(); + // setup pipeline factory + pipelineFactory = configuration.getClientPipelineFactory(); + if (pipelineFactory == null) { + pipelineFactory = new DefaultClientPipelineFactory(); + } + if (isTcp()) { setupTCPCommunication(); } else { @@ -260,16 +267,8 @@ public class NettyProducer extends Defau ChannelFuture answer; ChannelPipeline clientPipeline; - if (configuration.getClientPipelineFactory() != null) { - // initialize user defined client pipeline factory - configuration.getClientPipelineFactory().setProducer(this); - clientPipeline = configuration.getClientPipelineFactory().getPipeline(); - } else { - // initialize client pipeline factory - ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this); - // must get the pipeline from the factory when opening a new connection - clientPipeline = clientPipelineFactory.getPipeline(); - } + // must get the pipeline from the factory when opening a new connection + clientPipeline = pipelineFactory.getPipeline(this); if (isTcp()) { ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory); @@ -283,7 +282,6 @@ public class NettyProducer extends Defau answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); return answer; } else { - // TODO: Is this correct for a UDP client ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); @@ -295,7 +293,9 @@ public class NettyProducer extends Defau // set the pipeline on the bootstrap connectionlessClientBootstrap.setPipeline(clientPipeline); - connectionlessClientBootstrap.bind(new InetSocketAddress(0)); + // bind and store channel so we can close it when stopping + Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0)); + ALL_CHANNELS.add(channel); answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); return answer; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Sun Apr 8 13:41:45 2012 @@ -18,29 +18,26 @@ package org.apache.camel.component.netty import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +/** + * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}. + * <p/> + * Implementators should use implement the {@link #getPipeline(NettyConsumer)} method. + * + * @see ChannelPipelineFactory + */ public abstract class ServerPipelineFactory implements ChannelPipelineFactory { - protected NettyConsumer consumer; - - public ServerPipelineFactory() { - } - - public ServerPipelineFactory(NettyConsumer consumer) { - this.consumer = consumer; - } - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline channelPipeline = Channels.pipeline(); - return channelPipeline; - } + /** + * Returns a newly created {@link ChannelPipeline}. + * + * @param consumer the netty consumer + */ + public abstract ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception; - public NettyConsumer getConsumer() { - return consumer; + @Override + public ChannelPipeline getPipeline() throws Exception { + throw new UnsupportedOperationException("use getPipeline(NettyConsumer) instead"); } - public void setConsumer(NettyConsumer consumer) { - this.consumer = consumer; - } - } Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java Sun Apr 8 13:41:45 2012 @@ -81,15 +81,16 @@ public class NettyCustomPipelineFactoryA context.stop(); assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response); - assertEquals(true, clientPipelineFactory.isfactoryInvoked()); - assertEquals(true, serverPipelineFactory.isfactoryInvoked()); + assertEquals(true, clientPipelineFactory.isFactoryInvoked()); + assertEquals(true, serverPipelineFactory.isFactoryInvoked()); } public class TestClientChannelPipelineFactory extends ClientPipelineFactory { private int maxLineSize = 1024; private boolean invoked; - - public ChannelPipeline getPipeline() throws Exception { + + @Override + public ChannelPipeline getPipeline(NettyProducer producer) throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); @@ -102,7 +103,7 @@ public class NettyCustomPipelineFactoryA return channelPipeline; } - public boolean isfactoryInvoked() { + public boolean isFactoryInvoked() { return invoked; } } @@ -110,8 +111,9 @@ public class NettyCustomPipelineFactoryA public class TestServerChannelPipelineFactory extends ServerPipelineFactory { private int maxLineSize = 1024; private boolean invoked; - - public ChannelPipeline getPipeline() throws Exception { + + @Override + public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); @@ -124,7 +126,7 @@ public class NettyCustomPipelineFactoryA return channelPipeline; } - public boolean isfactoryInvoked() { + public boolean isFactoryInvoked() { return invoked; } Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310999&r1=1310998&r2=1310999&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Sun Apr 8 13:41:45 2012 @@ -81,15 +81,16 @@ public class NettyCustomPipelineFactoryS context.stop(); assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response); - assertEquals(true, clientPipelineFactory.isfactoryInvoked()); - assertEquals(true, serverPipelineFactory.isfactoryInvoked()); + assertEquals(true, clientPipelineFactory.isFactoryInvoked()); + assertEquals(true, serverPipelineFactory.isFactoryInvoked()); } public class TestClientChannelPipelineFactory extends ClientPipelineFactory { private int maxLineSize = 1024; private boolean invoked; - - public ChannelPipeline getPipeline() throws Exception { + + @Override + public ChannelPipeline getPipeline(NettyProducer producer) throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); @@ -102,7 +103,7 @@ public class NettyCustomPipelineFactoryS return channelPipeline; } - public boolean isfactoryInvoked() { + public boolean isFactoryInvoked() { return invoked; } } @@ -110,8 +111,9 @@ public class NettyCustomPipelineFactoryS public class TestServerChannelPipelineFactory extends ServerPipelineFactory { private int maxLineSize = 1024; private boolean invoked; - - public ChannelPipeline getPipeline() throws Exception { + + @Override + public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); @@ -124,7 +126,7 @@ public class NettyCustomPipelineFactoryS return channelPipeline; } - public boolean isfactoryInvoked() { + public boolean isFactoryInvoked() { return invoked; } }