This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit ca506d7fdfcd340562e64998969162ca3c000784 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Feb 18 15:38:33 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/spi/ExchangeFactory.java | 9 ++++++ .../camel/impl/engine/AbstractCamelContext.java | 1 + .../camel/impl/engine/DefaultExchangeFactory.java | 11 ++++--- .../camel/impl/engine/PooledExchangeFactory.java | 36 ++++++++++++++-------- .../org/apache/camel/support/DefaultConsumer.java | 20 ++++++++---- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 95586c4..8e429f2 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -38,6 +39,14 @@ public interface ExchangeFactory { String FACTORY = "exchange-factory"; /** + * Creates a new {@link ExchangeFactory} that is private for the given consumer. + * + * @param consumer the consumer that will use the created {@link ExchangeFactory} + * @return the created factory. + */ + ExchangeFactory newExchangeFactory(Consumer consumer); + + /** * Gets a new {@link Exchange} * * @param autoRelease whether to auto release the exchange when routing is complete via {@link UnitOfWork} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 3e1451f..22195da 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -3624,6 +3624,7 @@ public abstract class AbstractCamelContext extends BaseService getDataFormatResolver(); getExecutorServiceManager(); + getExchangeFactory(); getShutdownStrategy(); getUuidGenerator(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java index ec17772..a8db865 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -16,10 +16,7 @@ */ package org.apache.camel.impl.engine; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; +import org.apache.camel.*; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultExchange; @@ -41,6 +38,12 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar } @Override + public ExchangeFactory newExchangeFactory(Consumer consumer) { + // we just use a shared factory + return this; + } + + @Override public Exchange create(boolean autoRelease) { return new DefaultExchange(camelContext); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 30094cd..0541691 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -19,18 +19,12 @@ package org.apache.camel.impl.engine; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Experimental; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.NonManagedService; -import org.apache.camel.StaticService; +import org.apache.camel.*; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +37,7 @@ public class PooledExchangeFactory extends ServiceSupport private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class); + private final Consumer consumer; private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion(); private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>(); private final AtomicLong acquired = new AtomicLong(); @@ -53,6 +48,16 @@ public class PooledExchangeFactory extends ServiceSupport private CamelContext camelContext; private boolean statisticsEnabled = true; + public PooledExchangeFactory() { + this.consumer = null; + } + + private PooledExchangeFactory(Consumer consumer, CamelContext camelContext, boolean statisticsEnabled) { + this.consumer = consumer; + this.camelContext = camelContext; + this.statisticsEnabled = statisticsEnabled; + } + @Override public CamelContext getCamelContext() { return camelContext; @@ -63,6 +68,11 @@ public class PooledExchangeFactory extends ServiceSupport this.camelContext = camelContext; } + @Override + public ExchangeFactory newExchangeFactory(Consumer consumer) { + return new PooledExchangeFactory(consumer, camelContext, statisticsEnabled); + } + public boolean isStatisticsEnabled() { return statisticsEnabled; } @@ -132,7 +142,6 @@ public class PooledExchangeFactory extends ServiceSupport if (statisticsEnabled) { discarded.incrementAndGet(); } - // ignore LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange); } } @@ -141,9 +150,12 @@ public class PooledExchangeFactory extends ServiceSupport protected void doStop() throws Exception { pool.clear(); - if (statisticsEnabled) { - LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}, discarded: {}]", - created.get(), acquired.get(), released.get(), discarded.get()); + if (statisticsEnabled && consumer != null) { + String uri = consumer.getEndpoint().getEndpointBaseUri(); + uri = URISupport.sanitizeUri(uri); + + LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]", + uri, created.get(), acquired.get(), released.get(), discarded.get()); } created.set(0); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index c8e594f..ee12829 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -56,7 +56,9 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw this.processor = processor; this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor); this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); - this.exchangeFactory = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory(); + // create a per consumer exchange factory + this.exchangeFactory = endpoint.getCamelContext().adapt(ExtendedCamelContext.class) + .getExchangeFactory().newExchangeFactory(this); } @Override @@ -167,19 +169,25 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw @Override protected void doInit() throws Exception { LOG.debug("Init consumer: {}", this); - ServiceHelper.initService(processor); + ServiceHelper.initService(exchangeFactory, processor); + } + + @Override + protected void doStart() throws Exception { + LOG.debug("Starting consumer: {}", this); + ServiceHelper.startService(exchangeFactory, processor); } @Override protected void doStop() throws Exception { LOG.debug("Stopping consumer: {}", this); - ServiceHelper.stopService(processor); + ServiceHelper.stopService(exchangeFactory, processor); } @Override - protected void doStart() throws Exception { - LOG.debug("Starting consumer: {}", this); - ServiceHelper.startService(processor); + protected void doShutdown() throws Exception { + LOG.debug("Shutting down consumer: {}", this); + ServiceHelper.stopAndShutdownServices(exchangeFactory, processor); } /**