CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e55b0e8c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e55b0e8c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e55b0e8c Branch: refs/heads/master Commit: e55b0e8ce07950b44cfe067470f351a6cbe0d95f Parents: 88ab0c4 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 26 13:43:08 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 26 13:43:08 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedPollEnricherMBean.java | 8 ++- .../org/apache/camel/impl/ConsumerCache.java | 66 ++++++++++++++++++-- .../org/apache/camel/impl/ProducerCache.java | 7 +-- .../management/mbean/ManagedPollEnricher.java | 51 ++++++++++++++- .../apache/camel/processor/PollEnricher.java | 5 ++ .../management/ManagedPollEnricherTest.java | 15 ++++- 6 files changed, 141 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java index 623b85c..da11262 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java @@ -16,9 +16,12 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; -public interface ManagedPollEnricherMBean extends ManagedProcessorMBean { +public interface ManagedPollEnricherMBean extends ManagedProcessorMBean, ManagedExtendedInformation { @ManagedAttribute(description = "The language for the expression") String getExpressionLanguage(); @@ -38,4 +41,7 @@ public interface ManagedPollEnricherMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint") Boolean isAggregateOnException(); + @ManagedOperation(description = "Statistics of the endpoints that has been poll enriched from") + TabularData extendedInformation(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java index d957efe..399012c 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java @@ -26,6 +26,7 @@ import org.apache.camel.IsSingleton; import org.apache.camel.PollingConsumer; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ServicePoolAware; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ServicePool; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.CamelContextHelper; @@ -41,11 +42,16 @@ import org.slf4j.LoggerFactory; */ public class ConsumerCache extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class); + private final CamelContext camelContext; private final ServicePool<Endpoint, PollingConsumer> pool; private final Map<String, PollingConsumer> consumers; private final Object source; + private EndpointUtilizationStatistics statistics; + private boolean extendedStatistics; + private int maxCacheSize; + public ConsumerCache(Object source, CamelContext camelContext) { this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); } @@ -63,6 +69,27 @@ public class ConsumerCache extends ServiceSupport { this.consumers = cache; this.source = source; this.pool = pool; + if (consumers instanceof LRUCache) { + maxCacheSize = ((LRUCache) consumers).getMaxCacheSize(); + } + + // only if JMX is enabled + if (camelContext.getManagementStrategy().getManagementAgent() != null) { + this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); + } else { + this.extendedStatistics = false; + } + } + + public boolean isExtendedStatistics() { + return extendedStatistics; + } + + /** + * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} + */ + public void setExtendedStatistics(boolean extendedStatistics) { + this.extendedStatistics = extendedStatistics; } /** @@ -156,8 +183,15 @@ public class ConsumerCache extends ServiceSupport { } } } + + if (answer != null) { + // record statistics + if (extendedStatistics) { + statistics.onHit(key); + } + } + return answer; - } public Exchange receive(Endpoint endpoint) { @@ -295,6 +329,9 @@ public class ConsumerCache extends ServiceSupport { LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; cache.resetStatistics(); } + if (statistics != null) { + statistics.clear(); + } } /** @@ -302,6 +339,13 @@ public class ConsumerCache extends ServiceSupport { */ public synchronized void purge() { consumers.clear(); + if (statistics != null) { + statistics.clear(); + } + } + + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return statistics; } @Override @@ -310,15 +354,29 @@ public class ConsumerCache extends ServiceSupport { } protected void doStart() throws Exception { + if (extendedStatistics) { + int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; + statistics = new DefaultEndpointUtilizationStatistics(max); + } + ServiceHelper.startServices(consumers.values()); } protected void doStop() throws Exception { // when stopping we intend to shutdown - ServiceHelper.stopAndShutdownServices(consumers.values()); + ServiceHelper.stopAndShutdownServices(statistics, pool); + try { + ServiceHelper.stopAndShutdownServices(consumers.values()); + } finally { + // ensure consumers are removed, and also from JMX + for (PollingConsumer consumer : consumers.values()) { + getCamelContext().removeService(consumer); + } + } consumers.clear(); - // we need to stop the pool service here - ServiceHelper.stopService(pool); + if (statistics != null) { + statistics.clear(); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index 7b2cd37..5b79954 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -67,9 +67,6 @@ public class ProducerCache extends ServiceSupport { public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize)); - if (producers instanceof LRUCache) { - maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); - } } public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { @@ -81,6 +78,9 @@ public class ProducerCache extends ServiceSupport { this.camelContext = camelContext; this.pool = producerServicePool; this.producers = cache; + if (producers instanceof LRUCache) { + maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); + } // only if JMX is enabled if (camelContext.getManagementStrategy().getManagementAgent() != null) { @@ -451,7 +451,6 @@ public class ProducerCache extends ServiceSupport { if (extendedStatistics) { statistics.onHit(key); } - } return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java index 0b71e8f..ae0b7f1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java @@ -16,12 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedPollEnricherMBean; import org.apache.camel.model.PollEnrichDefinition; import org.apache.camel.processor.PollEnricher; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedPollEnricher extends ManagedProcessor implements ManagedPollEnricherMBean { private final PollEnricher processor; private String uri; + private boolean sanitize; public ManagedPollEnricher(CamelContext context, PollEnricher processor, PollEnrichDefinition definition) { super(context, processor, definition); @@ -39,7 +50,7 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; uri = getDefinition().getExpression().getExpression(); if (sanitize) { uri = URISupport.sanitizeUri(uri); @@ -47,6 +58,14 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll } @Override + public synchronized void reset() { + super.reset(); + if (processor.getEndpointUtilizationStatistics() != null) { + processor.getEndpointUtilizationStatistics().clear(); + } + } + + @Override public PollEnrichDefinition getDefinition() { return (PollEnrichDefinition) super.getDefinition(); } @@ -85,4 +104,34 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll public Boolean isAggregateOnException() { return processor.isAggregateOnException(); } + + @Override + public TabularData extendedInformation() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 998926b..691e55d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -28,6 +28,7 @@ import org.apache.camel.PollingConsumer; import org.apache.camel.impl.ConsumerCache; import org.apache.camel.impl.EmptyConsumerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -94,6 +95,10 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw return expression; } + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return consumerCache.getEndpointUtilizationStatistics(); + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/e55b0e8c/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java index 657a374..790c512 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedPollEnricherTest.java @@ -20,6 +20,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; +import org.apache.camel.ManagementStatisticsLevel; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +31,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedPollEnricherTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended); + return context; + } + public void testManagePollEnricher() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -68,7 +77,11 @@ public class ManagedPollEnricherTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Expression"); assertEquals("seda:${header.whereto}", uri); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "extendedInformation", null, null); + assertNotNull(data); + assertEquals(1, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size());