This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 161ed6966a7c8185fc7818b51d0bc58fde19ec1b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Feb 24 12:36:54 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../org/apache/camel/ExtendedCamelContext.java | 11 ++ .../java/org/apache/camel/spi/ExchangeFactory.java | 73 +++++++++- .../apache/camel/spi/ExchangeFactoryManager.java | 78 ++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 26 +++- .../camel/impl/engine/DefaultExchangeFactory.java | 161 ++++++++++++++++++++- .../impl/engine/DefaultExchangeFactoryManager.java | 101 +++++++++++++ .../camel/impl/engine/PooledExchangeFactory.java | 110 ++++++-------- .../camel/impl/engine/SimpleCamelContext.java | 6 + .../camel/impl/ExtendedCamelContextConfigurer.java | 6 + .../camel/impl/lw/LightweightCamelContext.java | 11 ++ .../impl/lw/LightweightRuntimeCamelContext.java | 13 ++ .../api/management/mbean/CamelOpenMBeanTypes.java | 13 ++ .../mbean/ManagedExchangeFactoryManagerMBean.java | 44 ++++++ .../management/JmxManagementLifecycleStrategy.java | 4 + .../mbean/ManagedExchangeFactoryManager.java | 110 ++++++++++++++ .../org/apache/camel/support/DefaultConsumer.java | 5 +- 16 files changed, 693 insertions(+), 79 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 6cc0fc5..4689df9 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -37,6 +37,7 @@ import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; import org.apache.camel.spi.HeadersMapFactory; @@ -225,6 +226,16 @@ public interface ExtendedCamelContext extends CamelContext { void setExchangeFactory(ExchangeFactory exchangeFactory); /** + * Gets the exchange factory manager to use. + */ + ExchangeFactoryManager getExchangeFactoryManager(); + + /** + * Sets a custom exchange factory manager to use. + */ + void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager); + + /** * Returns the bean post processor used to do any bean customization. * * @return the bean post processor. diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index e2047e1..ac8d7e6 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -16,9 +16,12 @@ */ package org.apache.camel.spi; +import org.apache.camel.CamelContextAware; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.NonManagedService; +import org.apache.camel.Service; /** * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the @@ -33,7 +36,50 @@ import org.apache.camel.Exchange; * The factory is pluggable which allows to use different strategies. The default factory will create a new * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. */ -public interface ExchangeFactory { +public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService { + + /** + * Utilization statistics of the this factory. + */ + interface Statistics { + + /** + * Number of new exchanges created. + */ + long getCreatedCounter(); + + /** + * Number of exchanges acquired (reused) when using pooled factory. + */ + long getAcquiredCounter(); + + /** + * Number of exchanges released back to pool + */ + long getReleasedCounter(); + + /** + * Number of exchanges discarded (thrown away) such as if no space in cache pool. + */ + long getDiscardedCounter(); + + /** + * Reset the counters + */ + void reset(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Sets whether statistics is enabled. + * + * @param statisticsEnabled <tt>true</tt> to enable + */ + void setStatisticsEnabled(boolean statisticsEnabled); + } /** * Service factory key. @@ -41,6 +87,11 @@ public interface ExchangeFactory { String FACTORY = "exchange-factory"; /** + * The consumer using this factory. + */ + Consumer getConsumer(); + + /** * Creates a new {@link ExchangeFactory} that is private for the given consumer. * * @param consumer the consumer that will use the created {@link ExchangeFactory} @@ -79,6 +130,11 @@ public interface ExchangeFactory { int getCapacity(); /** + * The current number of exchanges in the pool + */ + int getSize(); + + /** * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. */ void setCapacity(int capacity); @@ -93,4 +149,19 @@ public interface ExchangeFactory { */ void setStatisticsEnabled(boolean statisticsEnabled); + /** + * Reset the statistics + */ + void resetStatistics(); + + /** + * Purges the internal cache (if pooled) + */ + void purge(); + + /** + * Gets the usage statistics + */ + Statistics getStatistics(); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java new file mode 100644 index 0000000..a46884a --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Collection; + +import org.apache.camel.StaticService; + +/** + * Manages {@link ExchangeFactory}. + */ +public interface ExchangeFactoryManager extends StaticService { + + /** + * Adds the {@link ExchangeFactory} to be managed. + * + * @param exchangeFactory the exchange factory + */ + void addExchangeFactory(ExchangeFactory exchangeFactory); + + /** + * Removes the {@link ExchangeFactory} from being managed (such as when a route is stopped/removed) or during + * shutdown. + * + * @param exchangeFactory the exchange factory + */ + void removeExchangeFactory(ExchangeFactory exchangeFactory); + + /** + * Returns a read-only view of the managed factories. + */ + Collection<ExchangeFactory> getExchangeFactories(); + + /** + * Number of consumers currently being managed + */ + int getSize(); + + /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + int getCapacity(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Whether statistics is enabled. + */ + void setStatisticsEnabled(boolean statisticsEnabled); + + /** + * Reset the statistics + */ + void resetStatistics(); + + /** + * Purges the internal caches (if pooled) + */ + void purge(); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 22195da..305bfa9 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -102,6 +102,7 @@ import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -265,6 +266,7 @@ public abstract class AbstractCamelContext extends BaseService private volatile String version; private volatile PropertiesComponent propertiesComponent; private volatile CamelContextNameStrategy nameStrategy; + private volatile ExchangeFactoryManager exchangeFactoryManager = new DefaultExchangeFactoryManager(); private volatile ExchangeFactory exchangeFactory; private volatile ReactiveExecutor reactiveExecutor; private volatile ManagementNameStrategy managementNameStrategy; @@ -3684,6 +3686,7 @@ public abstract class AbstractCamelContext extends BaseService reactiveExecutor = null; asyncProcessorAwaitManager = null; exchangeFactory = null; + exchangeFactoryManager = null; } /** @@ -4657,7 +4660,26 @@ public abstract class AbstractCamelContext extends BaseService @Override public void setExchangeFactory(ExchangeFactory exchangeFactory) { - this.exchangeFactory = doAddService(exchangeFactory); + // automatic inject camel context + exchangeFactory.setCamelContext(this); + this.exchangeFactory = exchangeFactory; + } + + @Override + public ExchangeFactoryManager getExchangeFactoryManager() { + if (exchangeFactoryManager == null) { + synchronized (lock) { + if (exchangeFactoryManager == null) { + setExchangeFactoryManager(createExchangeFactoryManager()); + } + } + } + return exchangeFactoryManager; + } + + @Override + public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) { + this.exchangeFactoryManager = doAddService(exchangeFactoryManager); } @Override @@ -4754,6 +4776,8 @@ public abstract class AbstractCamelContext extends BaseService protected abstract ExchangeFactory createExchangeFactory(); + protected abstract ExchangeFactoryManager createExchangeFactoryManager(); + protected abstract HealthCheckRegistry createHealthCheckRegistry(); protected abstract ReactiveExecutor createReactiveExecutor(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java index 1ca6740..ae7e601 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -16,20 +16,51 @@ */ package org.apache.camel.impl.engine; +import java.util.concurrent.atomic.LongAdder; + import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.URISupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance. */ -public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware { +public class DefaultExchangeFactory extends ServiceSupport implements ExchangeFactory, CamelContextAware { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeFactory.class); - private CamelContext camelContext; + final UtilizationStatistics statistics = new UtilizationStatistics(); + final Consumer consumer; + CamelContext camelContext; + ExchangeFactoryManager exchangeFactoryManager; + + public DefaultExchangeFactory() { + this.consumer = null; + } + + public DefaultExchangeFactory(Consumer consumer) { + this.consumer = consumer; + } + + @Override + protected void doBuild() throws Exception { + this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager(); + } + + @Override + public Consumer getConsumer() { + return consumer; + } @Override public CamelContext getCamelContext() { @@ -43,28 +74,44 @@ public final class DefaultExchangeFactory implements ExchangeFactory, CamelConte @Override public ExchangeFactory newExchangeFactory(Consumer consumer) { - // we just use a shared factory - return this; + DefaultExchangeFactory answer = new DefaultExchangeFactory(consumer); + answer.setStatisticsEnabled(statistics.isStatisticsEnabled()); + answer.setCamelContext(camelContext); + return answer; } @Override public Exchange create(boolean autoRelease) { + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } return new DefaultExchange(camelContext); } @Override public Exchange create(Endpoint fromEndpoint, boolean autoRelease) { + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } return new DefaultExchange(fromEndpoint); } @Override + public boolean release(Exchange exchange) { + if (statistics.isStatisticsEnabled()) { + statistics.released.increment(); + } + return true; + } + + @Override public boolean isStatisticsEnabled() { - return false; + return statistics.isStatisticsEnabled(); } @Override public void setStatisticsEnabled(boolean statisticsEnabled) { - // not in use + statistics.setStatisticsEnabled(statisticsEnabled); } @Override @@ -73,7 +120,109 @@ public final class DefaultExchangeFactory implements ExchangeFactory, CamelConte } @Override + public int getSize() { + return 0; + } + + @Override public void setCapacity(int capacity) { // not in use } + + @Override + public void resetStatistics() { + statistics.reset(); + } + + @Override + public void purge() { + // not in use + } + + @Override + public Statistics getStatistics() { + return statistics; + } + + @Override + protected void doStart() throws Exception { + exchangeFactoryManager.addExchangeFactory(this); + } + + @Override + protected void doStop() throws Exception { + exchangeFactoryManager.removeExchangeFactory(this); + logUsageSummary(LOG, "DefaultExchangeFactory", 0); + statistics.reset(); + } + + void logUsageSummary(Logger log, String name, int pooled) { + if (statistics.isStatisticsEnabled() && consumer != null) { + // only log if there is any usage + long created = statistics.getCreatedCounter(); + long acquired = statistics.getAcquiredCounter(); + long released = statistics.getReleasedCounter(); + long discarded = statistics.getDiscardedCounter(); + boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0; + if (shouldLog) { + String uri = consumer.getEndpoint().getEndpointBaseUri(); + uri = URISupport.sanitizeUri(uri); + + LOG.info("{} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]", + name, uri, pooled, created, acquired, released, discarded); + } + } + } + + /** + * Represents utilization statistics + */ + final class UtilizationStatistics implements ExchangeFactory.Statistics { + + private boolean statisticsEnabled; + + final LongAdder created = new LongAdder(); + final LongAdder acquired = new LongAdder(); + final LongAdder released = new LongAdder(); + final LongAdder discarded = new LongAdder(); + + @Override + public void reset() { + created.reset(); + acquired.reset(); + released.reset(); + discarded.reset(); + } + + @Override + public long getCreatedCounter() { + return created.longValue(); + } + + @Override + public long getAcquiredCounter() { + return acquired.longValue(); + } + + @Override + public long getReleasedCounter() { + return released.longValue(); + } + + @Override + public long getDiscardedCounter() { + return discarded.longValue(); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java new file mode 100644 index 0000000..7b5ffc1 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Consumer; +import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; +import org.apache.camel.support.service.ServiceSupport; + +public class DefaultExchangeFactoryManager extends ServiceSupport implements ExchangeFactoryManager, CamelContextAware { + + private CamelContext camelContext; + private final Map<Consumer, ExchangeFactory> factories = new ConcurrentHashMap<>(); + private int capacity; + private boolean statisticsEnabled; + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public void addExchangeFactory(ExchangeFactory exchangeFactory) { + factories.put(exchangeFactory.getConsumer(), exchangeFactory); + // same for all factories + capacity = exchangeFactory.getCapacity(); + statisticsEnabled = exchangeFactory.isStatisticsEnabled(); + } + + @Override + public void removeExchangeFactory(ExchangeFactory exchangeFactory) { + factories.remove(exchangeFactory.getConsumer()); + } + + @Override + public Collection<ExchangeFactory> getExchangeFactories() { + return Collections.unmodifiableCollection(factories.values()); + } + + @Override + public int getSize() { + return factories.size(); + } + + @Override + public int getCapacity() { + return capacity; + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + for (ExchangeFactory ef : factories.values()) { + ef.setStatisticsEnabled(statisticsEnabled); + } + } + + @Override + public void resetStatistics() { + factories.values().forEach(ExchangeFactory::resetStatistics); + } + + @Override + public void purge() { + factories.values().forEach(ExchangeFactory::purge); + } + + @Override + protected void doShutdown() throws Exception { + factories.clear(); + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index dcd0a03..e83fbfd 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -18,102 +18,85 @@ package org.apache.camel.impl.engine; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.NonManagedService; import org.apache.camel.PooledExchange; -import org.apache.camel.StaticService; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultPooledExchange; -import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool. */ -public final class PooledExchangeFactory extends ServiceSupport - implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService { +public final class PooledExchangeFactory extends DefaultExchangeFactory { private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class); private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask(); private final Consumer consumer; private BlockingQueue<Exchange> pool; - private final AtomicLong acquired = new AtomicLong(); - private final AtomicLong created = new AtomicLong(); - private final AtomicLong released = new AtomicLong(); - private final AtomicLong discarded = new AtomicLong(); - - private CamelContext camelContext; private int capacity = 100; - private boolean statisticsEnabled; public PooledExchangeFactory() { this.consumer = null; } - private PooledExchangeFactory(Consumer consumer, CamelContext camelContext, boolean statisticsEnabled, int capacity) { + public PooledExchangeFactory(Consumer consumer) { this.consumer = consumer; - this.camelContext = camelContext; - this.statisticsEnabled = statisticsEnabled; - this.capacity = capacity; } @Override protected void doBuild() throws Exception { + super.doBuild(); this.pool = new ArrayBlockingQueue<>(capacity); } @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; + public Consumer getConsumer() { + return consumer; } @Override public ExchangeFactory newExchangeFactory(Consumer consumer) { - return new PooledExchangeFactory(consumer, camelContext, statisticsEnabled, capacity); + PooledExchangeFactory answer = new PooledExchangeFactory(consumer); + answer.setCamelContext(camelContext); + answer.setCapacity(capacity); + answer.setStatisticsEnabled(isStatisticsEnabled()); + return answer; } public int getCapacity() { return capacity; } - public void setCapacity(int capacity) { - this.capacity = capacity; - } - - public boolean isStatisticsEnabled() { - return statisticsEnabled; + @Override + public int getSize() { + if (pool != null) { + return pool.size(); + } else { + return 0; + } } - public void setStatisticsEnabled(boolean statisticsEnabled) { - this.statisticsEnabled = statisticsEnabled; + public void setCapacity(int capacity) { + this.capacity = capacity; } @Override public Exchange create(boolean autoRelease) { Exchange exchange = pool.poll(); if (exchange == null) { - if (statisticsEnabled) { - created.incrementAndGet(); - } // create a new exchange as there was no free from the pool exchange = createPooledExchange(null, autoRelease); + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } } else { - if (statisticsEnabled) { - acquired.incrementAndGet(); + if (statistics.isStatisticsEnabled()) { + statistics.acquired.increment(); } // reset exchange for reuse PooledExchange ee = exchange.adapt(PooledExchange.class); @@ -126,14 +109,14 @@ public final class PooledExchangeFactory extends ServiceSupport public Exchange create(Endpoint fromEndpoint, boolean autoRelease) { Exchange exchange = pool.poll(); if (exchange == null) { - if (statisticsEnabled) { - created.incrementAndGet(); - } // create a new exchange as there was no free from the pool exchange = new DefaultPooledExchange(fromEndpoint); + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } } else { - if (statisticsEnabled) { - acquired.incrementAndGet(); + if (statistics.isStatisticsEnabled()) { + statistics.acquired.increment(); } // reset exchange for reuse PooledExchange ee = exchange.adapt(PooledExchange.class); @@ -154,17 +137,17 @@ public final class PooledExchangeFactory extends ServiceSupport // only release back in pool if reset was success boolean inserted = pool.offer(exchange); - if (statisticsEnabled) { + if (statistics.isStatisticsEnabled()) { if (inserted) { - released.incrementAndGet(); + statistics.released.increment(); } else { - discarded.incrementAndGet(); + statistics.discarded.increment(); } } return inserted; } catch (Exception e) { - if (statisticsEnabled) { - discarded.incrementAndGet(); + if (statistics.isStatisticsEnabled()) { + statistics.discarded.increment(); } LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange); return false; @@ -187,25 +170,18 @@ public final class PooledExchangeFactory extends ServiceSupport } @Override - protected void doStop() throws Exception { + public void purge() { pool.clear(); + } - if (statisticsEnabled && consumer != null) { - // only log if there is any usage - boolean shouldLog = created.get() > 0 || acquired.get() > 0 || released.get() > 0 || discarded.get() > 0; - if (shouldLog) { - String uri = consumer.getEndpoint().getEndpointBaseUri(); - uri = URISupport.sanitizeUri(uri); - - LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]", - uri, created.get(), acquired.get(), released.get(), discarded.get()); - } - } + @Override + protected void doStop() throws Exception { + exchangeFactoryManager.removeExchangeFactory(this); + logUsageSummary(LOG, "PooledExchangeFactory", pool.size()); + statistics.reset(); + pool.clear(); - created.set(0); - acquired.set(0); - released.set(0); - discarded.set(0); + // do not call super } private final class ReleaseOnDoneTask implements PooledExchange.OnDoneTask { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index b91710c..90f7a62 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -42,6 +42,7 @@ import org.apache.camel.spi.DataFormatResolver; import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -555,6 +556,11 @@ public class SimpleCamelContext extends AbstractCamelContext { } @Override + protected ExchangeFactoryManager createExchangeFactoryManager() { + return new DefaultExchangeFactoryManager(); + } + + @Override protected ReactiveExecutor createReactiveExecutor() { Optional<ReactiveExecutor> result = ResolverHelper.resolveService( getCamelContextReference(), diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java index b6e2a47..fc3b1b3 100644 --- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java +++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java @@ -67,6 +67,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "EventNotificationApplicable": target.setEventNotificationApplicable(property(camelContext, boolean.class, value)); return true; case "exchangefactory": case "ExchangeFactory": target.setExchangeFactory(property(camelContext, org.apache.camel.spi.ExchangeFactory.class, value)); return true; + case "exchangefactorymanager": + case "ExchangeFactoryManager": target.setExchangeFactoryManager(property(camelContext, org.apache.camel.spi.ExchangeFactoryManager.class, value)); return true; case "executorservicemanager": case "ExecutorServiceManager": target.setExecutorServiceManager(property(camelContext, org.apache.camel.spi.ExecutorServiceManager.class, value)); return true; case "factoryfinderresolver": @@ -236,6 +238,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "EventNotificationApplicable": return boolean.class; case "exchangefactory": case "ExchangeFactory": return org.apache.camel.spi.ExchangeFactory.class; + case "exchangefactorymanager": + case "ExchangeFactoryManager": return org.apache.camel.spi.ExchangeFactoryManager.class; case "executorservicemanager": case "ExecutorServiceManager": return org.apache.camel.spi.ExecutorServiceManager.class; case "factoryfinderresolver": @@ -406,6 +410,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "EventNotificationApplicable": return target.isEventNotificationApplicable(); case "exchangefactory": case "ExchangeFactory": return target.getExchangeFactory(); + case "exchangefactorymanager": + case "ExchangeFactoryManager": return target.getExchangeFactoryManager(); case "executorservicemanager": case "ExecutorServiceManager": return target.getExecutorServiceManager(); case "factoryfinderresolver": diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java index 7732b81..85f642e 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java @@ -89,6 +89,7 @@ import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -1452,6 +1453,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam } @Override + public ExchangeFactoryManager getExchangeFactoryManager() { + return getExtendedCamelContext().getExchangeFactoryManager(); + } + + @Override + public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) { + getExtendedCamelContext().setExchangeFactoryManager(exchangeFactoryManager); + } + + @Override public ReactiveExecutor getReactiveExecutor() { return getExtendedCamelContext().getReactiveExecutor(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java index cc96226..931073b 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java @@ -86,6 +86,7 @@ import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -169,6 +170,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat private final BeanIntrospection beanIntrospection; private final HeadersMapFactory headersMapFactory; private final ExchangeFactory exchangeFactory; + private final ExchangeFactoryManager exchangeFactoryManager; private final ReactiveExecutor reactiveExecutor; private final AsyncProcessorAwaitManager asyncProcessorAwaitManager; private final ExecutorServiceManager executorServiceManager; @@ -214,6 +216,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat beanIntrospection = context.adapt(ExtendedCamelContext.class).getBeanIntrospection(); headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory(); exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory(); + exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager(); reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor(); asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); executorServiceManager = context.getExecutorServiceManager(); @@ -1567,6 +1570,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat } @Override + public ExchangeFactoryManager getExchangeFactoryManager() { + return exchangeFactoryManager; + } + + @Override + public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) { + throw new UnsupportedOperationException(); + } + + @Override public void setExchangeFactory(ExchangeFactory exchangeFactory) { throw new UnsupportedOperationException(); } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index dbc5188..96fbcd8 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -65,6 +65,19 @@ public final class CamelOpenMBeanTypes { new OpenType[] { SimpleType.STRING, SimpleType.BOOLEAN, SimpleType.BOOLEAN }); } + public static TabularType listExchangeFactoryTabularType() throws OpenDataException { + CompositeType ct = listExchangeFactoryCompositeType(); + return new TabularType("listExchangeFactory", "Lists all the exchange factories", ct, new String[] { "url" }); + } + + public static CompositeType listExchangeFactoryCompositeType() throws OpenDataException { + return new CompositeType( + "factories", "Factories", + new String[] { "url", "capacity", "pooled", "created", "released" }, + new String[] { "Url", "Capacity", "Pooled", "Created", "Released" }, + new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG }); + } + public static TabularType listRuntimeEndpointsTabularType() throws OpenDataException { CompositeType ct = listRuntimeEndpointsCompositeType(); return new TabularType( diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java new file mode 100644 index 0000000..bedebc9 --- /dev/null +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import javax.management.openmbean.TabularData; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; + +public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean { + + @ManagedAttribute(description = "Max capacity per consumer for exchange pooling") + Integer getCapacity(); + + @ManagedAttribute(description = "Whether statistics is enabled") + Boolean getStatisticsEnabled(); + + @ManagedAttribute(description = "Whether statistics is enabled") + void setStatisticsEnabled(Boolean statisticsEnabled); + + @ManagedOperation(description = "Reset statistics") + void resetStatistics(); + + @ManagedOperation(description = "Purges the pool") + void purge(); + + @ManagedOperation(description = "Lists all the consumers and their pooling statistics") + TabularData listPools(); + +} diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index aae3e70..943584f 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -60,6 +60,7 @@ import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedConsumerCache; import org.apache.camel.management.mbean.ManagedEndpoint; import org.apache.camel.management.mbean.ManagedEndpointRegistry; +import org.apache.camel.management.mbean.ManagedExchangeFactoryManager; import org.apache.camel.management.mbean.ManagedInflightRepository; import org.apache.camel.management.mbean.ManagedProducerCache; import org.apache.camel.management.mbean.ManagedRestRegistry; @@ -86,6 +87,7 @@ import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EventNotifier; +import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.LifecycleStrategy; @@ -537,6 +539,8 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li answer = new ManagedConsumerCache(context, (ConsumerCache) service); } else if (service instanceof ProducerCache) { answer = new ManagedProducerCache(context, (ProducerCache) service); + } else if (service instanceof ExchangeFactoryManager) { + answer = new ManagedExchangeFactoryManager(context, (ExchangeFactoryManager) service); } else if (service instanceof EndpointRegistry) { answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); } else if (service instanceof BeanIntrospection) { diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java new file mode 100644 index 0000000..bab7a0d --- /dev/null +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.Collection; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.camel.CamelContext; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; +import org.apache.camel.api.management.mbean.ManagedExchangeFactoryManagerMBean; +import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.ExchangeFactoryManager; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.URISupport; + +@ManagedResource(description = "Managed ExchangeFactory") +public class ManagedExchangeFactoryManager extends ManagedService implements ManagedExchangeFactoryManagerMBean { + + private ExchangeFactoryManager exchangeFactoryManager; + private boolean sanitize; + + public ManagedExchangeFactoryManager(CamelContext context, ExchangeFactoryManager exchangeFactoryManager) { + super(context, exchangeFactoryManager); + this.exchangeFactoryManager = exchangeFactoryManager; + } + + @Override + public void init(ManagementStrategy strategy) { + super.init(strategy); + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + } + + @Override + public Integer getCapacity() { + return exchangeFactoryManager.getCapacity(); + } + + @Override + public Boolean getStatisticsEnabled() { + return exchangeFactoryManager.isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(Boolean statisticsEnabled) { + exchangeFactoryManager.setStatisticsEnabled(statisticsEnabled); + } + + @Override + public void resetStatistics() { + exchangeFactoryManager.resetStatistics(); + } + + @Override + public void purge() { + exchangeFactoryManager.purge(); + } + + @Override + public TabularData listPools() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType()); + Collection<ExchangeFactory> factories = exchangeFactoryManager.getExchangeFactories(); + for (ExchangeFactory ef : factories) { + CompositeType ct = CamelOpenMBeanTypes.listExchangeFactoryCompositeType(); + String url = ef.getConsumer().getEndpoint().getEndpointUri(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + int capacity = ef.getCapacity(); + int size = ef.getSize(); + long created = 0; + long released = 0; + if (ef.isStatisticsEnabled()) { + created = ef.getStatistics().getCreatedCounter(); + released = ef.getStatistics().getReleasedCounter(); + } + + CompositeData data = new CompositeDataSupport( + ct, new String[] { "url", "capacity", "pooled", "created", "released" }, + new Object[] { url, capacity, size, created, released }); + answer.put(data); + } + return answer; + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index f27f56a..a57ef22 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -26,7 +26,6 @@ import org.apache.camel.PooledExchange; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RouteAware; -import org.apache.camel.Service; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.RouteIdAware; @@ -174,9 +173,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw @Override protected void doBuild() throws Exception { super.doBuild(); - if (exchangeFactory instanceof Service) { - ((Service) exchangeFactory).build(); - } + exchangeFactory.build(); } @Override