Author: davsclaus Date: Sun Apr 8 12:54:55 2012 New Revision: 1310993 URL: http://svn.apache.org/viewvc?rev=1310993&view=rev Log: CAMEL-4556: Netty producer now reuses connection.
Added: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java - copied unchanged from r1310989, camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1310989 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Sun Apr 8 12:54:55 2012 @@ -16,24 +16,18 @@ */ package org.apache.camel.component.netty; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; public abstract class ClientPipelineFactory implements ChannelPipelineFactory { protected NettyProducer producer; - protected Exchange exchange; - protected AsyncCallback callback; public ClientPipelineFactory() { } - - public ClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) { + + public ClientPipelineFactory(NettyProducer producer) { this.producer = producer; - this.exchange = exchange; - this.callback = callback; } public ChannelPipeline getPipeline() throws Exception { @@ -48,21 +42,4 @@ public abstract class ClientPipelineFact public void setProducer(NettyProducer producer) { this.producer = producer; } - - public Exchange getExchange() { - return exchange; - } - - public void setExchange(Exchange exchange) { - this.exchange = exchange; - } - - public AsyncCallback getCallback() { - return callback; - } - - public void setCallback(AsyncCallback callback) { - this.callback = callback; - } - } Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Sun Apr 8 12:54:55 2012 @@ -17,12 +17,9 @@ package org.apache.camel.component.netty; import java.util.List; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.jboss.netty.channel.ChannelDownstreamHandler; @@ -34,10 +31,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DefaultClientPipelineFactory extends ClientPipelineFactory { - private static final transient Logger LOG = LoggerFactory.getLogger(ClientPipelineFactory.class); + private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class); - public DefaultClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback callback) { - super(producer, exchange, callback); + public DefaultClientPipelineFactory(NettyProducer producer) { + super(producer); } public ChannelPipeline getPipeline() throws Exception { @@ -61,7 +58,7 @@ public class DefaultClientPipelineFactor } // our handler must be added last - channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + channelPipeline.addLast("handler", new ClientChannelHandler(producer)); return channelPipeline; } Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sun Apr 8 12:54:55 2012 @@ -27,7 +27,6 @@ import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.ServicePoolAware; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.impl.DefaultExchange; import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; @@ -37,6 +36,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelLocal; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; @@ -57,6 +57,9 @@ public class NettyProducer extends Defau private CamelLogger noReplyLogger; private ExecutorService bossExecutor; private ExecutorService workerExecutor; + private final ChannelLocal<NettyCamelState> state = new ChannelLocal<NettyCamelState>(); + private ChannelFuture channelFuture; + private Channel channel; public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { super(nettyEndpoint); @@ -157,11 +160,16 @@ public class NettyProducer extends Defau exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName())); } - ChannelFuture channelFuture; - final Channel channel; try { - channelFuture = openConnection(exchange, callback); - channel = openChannel(channelFuture); + // allow to reuse channel, on this producer, to avoid creating a new connection + // for each message being sent + if (channelFuture == null || channel == null || !channel.isOpen()) { + channelFuture = openConnection(); + channel = openChannel(channelFuture); + } + // setup state now we have the channel we can do this because + // this producer is not thread safe, but pooled using ServicePoolAware + state.set(channel, new NettyCamelState(callback, exchange)); } catch (Exception e) { exchange.setException(e); callback.done(true); @@ -218,6 +226,21 @@ public class NettyProducer extends Defau return false; } + /** + * To get the {@link NettyCamelState} from this producer. + */ + public NettyCamelState getState(Channel channel) { + return state.get(channel); + } + + /** + * To remove the {@link NettyCamelState} stored on this producer, + * when no longer needed + */ + public void removeState(Channel channel) { + state.remove(channel); + } + protected void setupTCPCommunication() throws Exception { if (channelFactory == null) { bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); @@ -233,19 +256,17 @@ public class NettyProducer extends Defau } } - private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws Exception { + private ChannelFuture openConnection() throws Exception { ChannelFuture answer; ChannelPipeline clientPipeline; if (configuration.getClientPipelineFactory() != null) { // initialize user defined client pipeline factory configuration.getClientPipelineFactory().setProducer(this); - configuration.getClientPipelineFactory().setExchange(exchange); - configuration.getClientPipelineFactory().setCallback(callback); clientPipeline = configuration.getClientPipelineFactory().getPipeline(); } else { // initialize client pipeline factory - ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this, exchange, callback); + ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this); // must get the pipeline from the factory when opening a new connection clientPipeline = clientPipelineFactory.getPipeline(); } @@ -295,13 +316,10 @@ public class NettyProducer extends Defau } private void openAndCloseConnection() throws Exception { - ChannelFuture future = openConnection(new DefaultExchange(context), new AsyncCallback() { - public void done(boolean doneSync) { - // noop - } - }); + ChannelFuture future = openConnection(); Channel channel = openChannel(future); NettyHelper.close(channel); + ALL_CHANNELS.remove(channel); } public NettyConfiguration getConfiguration() { Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Sun Apr 8 12:54:55 2012 @@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.component.netty.NettyCamelState; import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.NettyPayloadHelper; @@ -39,15 +40,11 @@ import org.slf4j.LoggerFactory; public class ClientChannelHandler extends SimpleChannelUpstreamHandler { private static final transient Logger LOG = LoggerFactory.getLogger(ClientChannelHandler.class); private final NettyProducer producer; - private final Exchange exchange; - private final AsyncCallback callback; private boolean messageReceived; private volatile boolean exceptionHandled; - public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback callback) { + public ClientChannelHandler(NettyProducer producer) { this.producer = producer; - this.exchange = exchange; - this.callback = callback; } @Override @@ -73,20 +70,33 @@ public class ClientChannelHandler extend if (LOG.isDebugEnabled()) { LOG.debug("Closing channel as an exception was thrown from Netty", cause); } - // set the cause on the exchange - exchange.setException(cause); - // close channel in case an exception was thrown - NettyHelper.close(exceptionEvent.getChannel()); + Exchange exchange = getExchange(ctx); + AsyncCallback callback = getAsyncCallback(ctx); - // signal callback - callback.done(false); + // the state may not be set + if (exchange != null && callback != null) { + // set the cause on the exchange + exchange.setException(cause); + + // close channel in case an exception was thrown + NettyHelper.close(exceptionEvent.getChannel()); + + // signal callback + callback.done(false); + } } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { LOG.trace("Channel closed: {}", ctx.getChannel()); + Exchange exchange = getExchange(ctx); + AsyncCallback callback = getAsyncCallback(ctx); + + // remove state + producer.removeState(ctx.getChannel()); + if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) { // session was closed but no message received. This could be because the remote server had an internal error // and could not return a response. We should count down to stop waiting for a response @@ -103,6 +113,9 @@ public class ClientChannelHandler extend public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { messageReceived = true; + Exchange exchange = getExchange(ctx); + AsyncCallback callback = getAsyncCallback(ctx); + Object body = messageEvent.getMessage(); LOG.debug("Message received: {}", body); @@ -150,4 +163,14 @@ public class ClientChannelHandler extend } } + private Exchange getExchange(ChannelHandlerContext ctx) { + NettyCamelState state = producer.getState(ctx.getChannel()); + return state != null ? state.getExchange() : null; + } + + private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) { + NettyCamelState state = producer.getState(ctx.getChannel()); + return state != null ? state.getCallback() : null; + } + } Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Sun Apr 8 12:54:55 2012 @@ -25,7 +25,6 @@ import org.apache.camel.component.netty. import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; -import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; @@ -35,9 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Server handler which is shared + * Client handler which cannot be shared */ -@ChannelHandler.Sharable public class ServerChannelHandler extends SimpleChannelUpstreamHandler { private static final transient Logger LOG = LoggerFactory.getLogger(ServerChannelHandler.class); private NettyConsumer consumer; Modified: camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java Sun Apr 8 12:54:55 2012 @@ -97,10 +97,9 @@ public class NettyCustomPipelineFactoryA channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); - channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + channelPipeline.addLast("handler", new ClientChannelHandler(producer)); return channelPipeline; - } public boolean isfactoryInvoked() { Modified: camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310993&r1=1310992&r2=1310993&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Sun Apr 8 12:54:55 2012 @@ -97,7 +97,7 @@ public class NettyCustomPipelineFactoryS channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); - channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); + channelPipeline.addLast("handler", new ClientChannelHandler(producer)); return channelPipeline; }