Author: davsclaus Date: Thu Dec 20 10:48:54 2012 New Revision: 1424399 URL: http://svn.apache.org/viewvc?rev=1424399&view=rev Log: CAMEL-5901: Allow to turn on|off netty producer pool.
Added: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java - copied unchanged from r1424398, camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java camel/branches/camel-2.10.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java - copied unchanged from r1424398, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java Modified: camel/branches/camel-2.10.x/ (props changed) camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1424398 Propchange: camel/branches/camel-2.10.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1424399&r1=1424398&r2=1424399&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu Dec 20 10:48:54 2012 @@ -83,9 +83,10 @@ public class NettyConfiguration implemen private int producerPoolMinIdle; private int producerPoolMaxIdle = 100; private long producerPoolMinEvictableIdle = 5 * 60 * 1000L; + private boolean producerPoolEnabled = true; private int backlog; private Map<String, Object> options; - + /** * Returns a copy of this configuration */ @@ -582,6 +583,14 @@ public class NettyConfiguration implemen this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle; } + public boolean isProducerPoolEnabled() { + return producerPoolEnabled; + } + + public void setProducerPoolEnabled(boolean producerPoolEnabled) { + this.producerPoolEnabled = producerPoolEnabled; + } + public int getBacklog() { return backlog; } Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424399&r1=1424398&r2=1424399&view=diff ============================================================================== --- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:48:54 2012 @@ -91,20 +91,32 @@ public class NettyProducer extends Defau protected void doStart() throws Exception { super.doStart(); - // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand - GenericObjectPool.Config config = new GenericObjectPool.Config(); - config.maxActive = configuration.getProducerPoolMaxActive(); - config.minIdle = configuration.getProducerPoolMinIdle(); - config.maxIdle = configuration.getProducerPoolMaxIdle(); - // we should test on borrow to ensure the channel is still valid - config.testOnBorrow = true; - // only evict channels which are no longer valid - config.testWhileIdle = true; - // run eviction every 30th second - config.timeBetweenEvictionRunsMillis = 30 * 1000L; - config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle(); - config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL; - pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config); + if (configuration.isProducerPoolEnabled()) { + // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand + GenericObjectPool.Config config = new GenericObjectPool.Config(); + config.maxActive = configuration.getProducerPoolMaxActive(); + config.minIdle = configuration.getProducerPoolMinIdle(); + config.maxIdle = configuration.getProducerPoolMaxIdle(); + // we should test on borrow to ensure the channel is still valid + config.testOnBorrow = true; + // only evict channels which are no longer valid + config.testWhileIdle = true; + // run eviction every 30th second + config.timeBetweenEvictionRunsMillis = 30 * 1000L; + config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle(); + config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL; + pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}", + new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool}); + } + } else { + pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory()); + if (LOG.isDebugEnabled()) { + LOG.info("Created NettyProducer shared singleton pool -> {}", pool); + } + } // setup pipeline factory ClientPipelineFactory factory = configuration.getClientPipelineFactory(); @@ -122,7 +134,8 @@ public class NettyProducer extends Defau if (!configuration.isLazyChannelCreation()) { // ensure the connection can be established when we start up - openAndCloseConnection(); + Channel channel = pool.borrowObject(); + pool.returnObject(channel); } } @@ -147,10 +160,13 @@ public class NettyProducer extends Defau context.getExecutorServiceManager().shutdownNow(workerExecutor); } - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle()); + if (pool != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle()); + } + pool.close(); + pool = null; } - pool.close(); super.doStop(); } @@ -360,7 +376,7 @@ public class NettyProducer extends Defau } } - private Channel openChannel(ChannelFuture channelFuture) throws Exception { + protected Channel openChannel(ChannelFuture channelFuture) throws Exception { // blocking for channel to be done if (LOG.isTraceEnabled()) { LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout()); @@ -380,13 +396,6 @@ public class NettyProducer extends Defau return answer; } - private void openAndCloseConnection() throws Exception { - ChannelFuture future = openConnection(); - Channel channel = openChannel(future); - NettyHelper.close(channel); - ALL_CHANNELS.remove(channel); - } - public NettyConfiguration getConfiguration() { return configuration; }