Repository: camel Updated Branches: refs/heads/master 15da595d4 -> 18e89e77c
CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1a576fab Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1a576fab Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1a576fab Branch: refs/heads/master Commit: 1a576fabb9db51e5d77400099846139cf8c2cb60 Parents: e25e24c Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 25 13:40:52 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 25 13:59:38 2015 +0200 ---------------------------------------------------------------------- .../management/mbean/CamelOpenMBeanTypes.java | 12 ++++ .../mbean/ManagedDynamicRouterMBean.java | 6 ++ .../mbean/ManagedRecipientListMBean.java | 6 ++ .../mbean/ManagedRoutingSlipMBean.java | 6 ++ .../mbean/ManagedSendDynamicProcessorMBean.java | 6 ++ .../DefaultEndpointUtilizationStatistics.java | 69 ++++++++++++++++++++ .../org/apache/camel/impl/ProducerCache.java | 53 ++++++++++++++- .../management/DefaultManagementStrategy.java | 9 +++ .../management/mbean/ManagedDynamicRouter.java | 44 ++++++++++++- .../management/mbean/ManagedRecipientList.java | 43 +++++++++++- .../management/mbean/ManagedRoutingSlip.java | 44 ++++++++++++- .../mbean/ManagedSendDynamicProcessor.java | 42 +++++++++++- .../apache/camel/processor/RecipientList.java | 5 ++ .../org/apache/camel/processor/RoutingSlip.java | 5 ++ .../camel/processor/SendDynamicProcessor.java | 5 ++ .../spi/EndpointUtilizationStatistics.java | 40 ++++++++++++ .../apache/camel/spi/ManagementStrategy.java | 14 ++++ .../camel/impl/DefaultProducerCacheTest.java | 50 ++++++++++++++ .../management/ManagedDynamicRouterTest.java | 25 ++++++- .../management/ManagedRecipientListTest.java | 27 ++++++-- .../management/ManagedRoutingSlipTest.java | 24 ++++++- .../ManagedSendDynamicProcessorTest.java | 23 ++++++- 22 files changed, 538 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index 9b45573..6c8f571 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -168,4 +168,16 @@ public final class CamelOpenMBeanTypes { new String[]{"Exception", "Failures"}, new OpenType[]{SimpleType.STRING, SimpleType.LONG}); } + + public static TabularType endpointsUtilizationTabularType() throws OpenDataException { + CompositeType ct = endpointsUtilizationCompositeType(); + return new TabularType("endpointsUtilization", "Endpoint utilization statistics", ct, new String[]{"url"}); + } + + public static CompositeType endpointsUtilizationCompositeType() throws OpenDataException { + return new CompositeType("endpoints", "Endpoints", new String[]{"url", "hits"}, + new String[]{"Url", "Hits"}, + new OpenType[]{SimpleType.STRING, SimpleType.LONG}); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicRouterMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicRouterMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicRouterMBean.java index 8901525..a07347f 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicRouterMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicRouterMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedDynamicRouterMBean extends ManagedProcessorMBean { @@ -35,4 +38,7 @@ public interface ManagedDynamicRouterMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") Boolean isIgnoreInvalidEndpoints(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") + TabularData endpointStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRecipientListMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRecipientListMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRecipientListMBean.java index e0ac299..4e02bcc 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRecipientListMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRecipientListMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedRecipientListMBean extends ManagedProcessorMBean { @@ -50,4 +53,7 @@ public interface ManagedRecipientListMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "The total timeout specified in millis, when using parallel processing.") Long getTimeout(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") + TabularData endpointStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRoutingSlipMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRoutingSlipMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRoutingSlipMBean.java index 828384f..7a660ed 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRoutingSlipMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRoutingSlipMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedRoutingSlipMBean extends ManagedProcessorMBean { @@ -35,4 +38,7 @@ public interface ManagedRoutingSlipMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") Boolean isIgnoreInvalidEndpoints(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") + TabularData endpointStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java index 9a6ae94..2d8d662 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean { @@ -32,4 +35,7 @@ public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") Boolean isIgnoreInvalidEndpoint(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") + TabularData endpointStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointUtilizationStatistics.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointUtilizationStatistics.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointUtilizationStatistics.java new file mode 100644 index 0000000..9c87550 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointUtilizationStatistics.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Collections; +import java.util.Map; + +import org.apache.camel.spi.EndpointUtilizationStatistics; +import org.apache.camel.util.LRUCache; + +public class DefaultEndpointUtilizationStatistics implements EndpointUtilizationStatistics { + + private final LRUCache<String, Long> map; + + public DefaultEndpointUtilizationStatistics(int maxCapacity) { + this.map = new LRUCache<String, Long>(16, maxCapacity, false); + } + + @Override + public int maxCapacity() { + return map.getMaxCacheSize(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public synchronized void onHit(String key) { + Long counter = map.get(key); + if (counter == null) { + counter = 1L; + map.put(key, counter); + } else { + counter++; + map.put(key, counter); + } + } + + @Override + public void remove(String key) { + map.remove(key); + } + + @Override + public Map<String, Long> getStatistics() { + return Collections.unmodifiableMap(map); + } + + @Override + public void clear() { + map.clear(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index 4ece29f..b321df2 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -31,6 +31,7 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerCallback; import org.apache.camel.ServicePoolAware; import org.apache.camel.processor.UnitOfWorkProducer; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ServicePool; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; @@ -54,7 +55,11 @@ public class ProducerCache extends ServiceSupport { private final ServicePool<Endpoint, Producer> pool; private final Map<String, Producer> producers; private final Object source; + + private EndpointUtilizationStatistics statistics; private boolean eventNotifierEnabled = true; + private boolean extendedStatistics; + private int maxCacheSize; public ProducerCache(Object source, CamelContext camelContext) { this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); @@ -62,6 +67,9 @@ public class ProducerCache extends ServiceSupport { public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize)); + if (producers instanceof LRUCache) { + maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); + } } public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { @@ -73,16 +81,31 @@ public class ProducerCache extends ServiceSupport { this.camelContext = camelContext; this.pool = producerServicePool; this.producers = cache; + this.extendedStatistics = camelContext.getManagementStrategy().isExtendedStatisticsEnabled(); } public boolean isEventNotifierEnabled() { return eventNotifierEnabled; } + /** + * Whether {@link org.apache.camel.spi.EventNotifier} is enabled + */ public void setEventNotifierEnabled(boolean eventNotifierEnabled) { this.eventNotifierEnabled = eventNotifierEnabled; } + public boolean isExtendedStatistics() { + return extendedStatistics; + } + + /** + * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} + */ + public void setExtendedStatistics(boolean extendedStatistics) { + this.extendedStatistics = extendedStatistics; + } + /** * Creates the {@link LRUCache} to be used. * <p/> @@ -417,17 +440,30 @@ public class ProducerCache extends ServiceSupport { } } + if (answer != null) { + // record statistics + if (extendedStatistics) { + statistics.onHit(key); + } + + } + return answer; } protected void doStart() throws Exception { + if (extendedStatistics) { + int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; + statistics = new DefaultEndpointUtilizationStatistics(max); + } + ServiceHelper.startServices(producers.values()); - ServiceHelper.startServices(pool); + ServiceHelper.startServices(statistics, pool); } protected void doStop() throws Exception { // when stopping we intend to shutdown - ServiceHelper.stopAndShutdownService(pool); + ServiceHelper.stopAndShutdownServices(statistics, pool); try { ServiceHelper.stopAndShutdownServices(producers.values()); } finally { @@ -437,6 +473,9 @@ public class ProducerCache extends ServiceSupport { } } producers.clear(); + if (statistics != null) { + statistics.clear(); + } } /** @@ -524,6 +563,9 @@ public class ProducerCache extends ServiceSupport { LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; cache.resetStatistics(); } + if (statistics != null) { + statistics.clear(); + } } /** @@ -532,6 +574,13 @@ public class ProducerCache extends ServiceSupport { public synchronized void purge() { producers.clear(); pool.purge(); + if (statistics != null) { + statistics.clear(); + } + } + + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return statistics; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java index 303718a..07fb638 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementStrategy.java @@ -62,6 +62,7 @@ public class DefaultManagementStrategy extends ServiceSupport implements Managem private ManagementAgent managementAgent; private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.All; private boolean loadStatisticsEnabled; + private boolean extendedStatisticsEnabled; private CamelContext camelContext; public DefaultManagementStrategy() { @@ -202,6 +203,14 @@ public class DefaultManagementStrategy extends ServiceSupport implements Managem this.loadStatisticsEnabled = loadStatisticsEnabled; } + public boolean isExtendedStatisticsEnabled() { + return extendedStatisticsEnabled; + } + + public void setExtendedStatisticsEnabled(boolean extendedStatisticsEnabled) { + this.extendedStatisticsEnabled = extendedStatisticsEnabled; + } + protected void doStart() throws Exception { LOG.info("JMX is disabled"); doStartManagementStrategy(); http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java index 5d371a9..e82d32d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java @@ -16,13 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedDynamicRouterMBean; import org.apache.camel.model.DynamicRouterDefinition; -import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.DynamicRouter; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -32,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedDynamicRouter extends ManagedProcessor implements ManagedDynamicRouterMBean { private final DynamicRouter processor; private String uri; + private boolean sanitize; public ManagedDynamicRouter(CamelContext context, DynamicRouter processor, DynamicRouterDefinition definition) { super(context, processor, definition); @@ -46,7 +56,7 @@ public class ManagedDynamicRouter extends ManagedProcessor implements ManagedDyn @Override public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; uri = getDefinition().getExpression().getExpression(); if (sanitize) { uri = URISupport.sanitizeUri(uri); @@ -77,4 +87,34 @@ public class ManagedDynamicRouter extends ManagedProcessor implements ManagedDyn public Boolean isIgnoreInvalidEndpoints() { return processor.isIgnoreInvalidEndpoints(); } + + @Override + public TabularData endpointStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java index 1033d60..394459d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java @@ -16,12 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedRecipientListMBean; import org.apache.camel.model.RecipientListDefinition; import org.apache.camel.processor.RecipientList; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedRecipientList extends ManagedProcessor implements ManagedRecipientListMBean { private final RecipientList processor; private String uri; + private boolean sanitize; public ManagedRecipientList(CamelContext context, RecipientList processor, RecipientListDefinition definition) { super(context, processor, definition); @@ -40,7 +51,7 @@ public class ManagedRecipientList extends ManagedProcessor implements ManagedRec @Override public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; uri = getDefinition().getExpression().getExpression(); if (sanitize) { uri = URISupport.sanitizeUri(uri); @@ -101,4 +112,34 @@ public class ManagedRecipientList extends ManagedProcessor implements ManagedRec public Long getTimeout() { return processor.getTimeout(); } + + @Override + public TabularData endpointStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java index d977334..1068c85 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java @@ -16,13 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedRoutingSlipMBean; -import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RoutingSlipDefinition; import org.apache.camel.processor.RoutingSlip; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -32,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedRoutingSlip extends ManagedProcessor implements ManagedRoutingSlipMBean { private final RoutingSlip processor; private String uri; + private boolean sanitize; public ManagedRoutingSlip(CamelContext context, RoutingSlip processor, RoutingSlipDefinition definition) { super(context, processor, definition); @@ -41,7 +51,7 @@ public class ManagedRoutingSlip extends ManagedProcessor implements ManagedRouti @Override public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; uri = getDefinition().getExpression().getExpression(); if (sanitize) { uri = URISupport.sanitizeUri(uri); @@ -77,4 +87,34 @@ public class ManagedRoutingSlip extends ManagedProcessor implements ManagedRouti public Boolean isIgnoreInvalidEndpoints() { return processor.isIgnoreInvalidEndpoints(); } + + @Override + public TabularData endpointStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java index 21a4207..0fb094b 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java @@ -16,12 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedSendDynamicProcessorMBean; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedSendDynamicProcessor extends ManagedProcessor implements ManagedSendDynamicProcessorMBean { private final SendDynamicProcessor processor; private String uri; + private boolean sanitize; public ManagedSendDynamicProcessor(CamelContext context, SendDynamicProcessor processor, ProcessorDefinition<?> definition) { super(context, processor, definition); @@ -39,7 +50,7 @@ public class ManagedSendDynamicProcessor extends ManagedProcessor implements Man public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + this.sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; if (sanitize) { uri = URISupport.sanitizeUri(processor.getUri()); } else { @@ -71,4 +82,33 @@ public class ManagedSendDynamicProcessor extends ManagedProcessor implements Man return processor.isIgnoreInvalidEndpoint(); } + @Override + public TabularData endpointStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index 3d14b69..98f8e45 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -30,6 +30,7 @@ import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -185,6 +186,10 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA return ExchangeHelper.resolveEndpoint(exchange, recipient); } + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return producerCache.getEndpointUtilizationStatistics(); + } + protected void doStart() throws Exception { if (producerCache == null) { if (cacheSize < 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index c41112e..a8ca7e0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -34,6 +34,7 @@ import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -409,6 +410,10 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace ServiceHelper.stopAndShutdownService(producerCache); } + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return producerCache.getEndpointUtilizationStatistics(); + } + /** * Returns the outbound message if available. Otherwise return the inbound message. */ http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 1129ea1..8bf317d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -29,6 +29,7 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Producer; import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -178,6 +179,10 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess ServiceHelper.stopServices(producerCache); } + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return producerCache.getEndpointUtilizationStatistics(); + } + public CamelContext getCamelContext() { return camelContext; } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java b/camel-core/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java new file mode 100644 index 0000000..81b252e --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Map; + +/** + * Various statistics about endpoint utilization, such as from EIP patterns that uses dynamic endpoints. + */ +public interface EndpointUtilizationStatistics { + + // TODO: document me + + int maxCapacity(); + + int size(); + + void onHit(String key); + + void remove(String key); + + Map<String, Long> getStatistics(); + + void clear(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java index 2e6453f..bc8ff06 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ManagementStrategy.java @@ -232,6 +232,20 @@ public interface ManagementStrategy extends Service { boolean isLoadStatisticsEnabled(); /** + * Sets whether extended statistics is enabled, such as each EIP keep tracks of per endpoint utilization. + * + * @param flag <tt>true</tt> to enable extended statistics + */ + void setExtendedStatisticsEnabled(boolean flag); + + /** + * Gets whether extended statistics is enabled + * + * @return <tt>true</tt> if enabled + */ + boolean isExtendedStatisticsEnabled(); + + /** * Sets the statistics level * <p/> * Default is {@link org.apache.camel.ManagementStatisticsLevel#All} http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java index 7c43c9f..31269fb 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.impl; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Consumer; @@ -24,6 +25,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.spi.EndpointUtilizationStatistics; /** * @version @@ -102,6 +104,54 @@ public class DefaultProducerCacheTest extends ContextTestSupport { assertEquals(3, shutdownCounter.get()); } + public void testExtendedStatistics() throws Exception { + ProducerCache cache = new ProducerCache(this, context, 5); + cache.setExtendedStatistics(true); + cache.start(); + + assertEquals("Size should be 0", 0, cache.size()); + + // use 1 = 2 times + // use 2 = 3 times + // use 3..4 = 1 times + // use 5 = 0 times + Endpoint e = new MyEndpoint(true, 1); + Producer p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 1); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 2); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 2); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 2); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 3); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + e = new MyEndpoint(true, 4); + p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + + assertEquals("Size should be 4", 4, cache.size()); + + EndpointUtilizationStatistics stats = cache.getEndpointUtilizationStatistics(); + assertEquals(4, stats.size()); + + Map<String, Long> recent = stats.getStatistics(); + assertEquals(2, recent.get("my://1").longValue()); + assertEquals(3, recent.get("my://2").longValue()); + assertEquals(1, recent.get("my://3").longValue()); + assertEquals(1, recent.get("my://4").longValue()); + assertNull(recent.get("my://5")); + + cache.stop(); + } + private final class MyEndpoint extends DefaultEndpoint { private final boolean isSingleton; http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicRouterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicRouterTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicRouterTest.java index a3407ba..ccf10aa 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicRouterTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicRouterTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedDynamicRouterTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().setExtendedStatisticsEnabled(true); + return context; + } + public void testManageDynamicRouter() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -36,9 +44,14 @@ public class ManagedDynamicRouterTest extends ManagementTestSupport { } MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + foo.expectedMessageCount(2); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); template.sendBodyAndHeader("direct:start", "Hello World", "whereTo", "direct:foo"); + template.sendBodyAndHeader("direct:start", "Bye World", "whereTo", "direct:foo"); + template.sendBodyAndHeader("direct:start", "Hi World", "whereTo", "direct:bar"); assertMockEndpointsSatisfied(); @@ -64,7 +77,11 @@ public class ManagedDynamicRouterTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Expression"); assertEquals("whereTo", uri); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "endpointStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(2, data.size()); @@ -88,6 +105,10 @@ public class ManagedDynamicRouterTest extends ManagementTestSupport { from("direct:foo") .to("mock:foo") .removeHeader("whereTo"); + + from("direct:bar") + .to("mock:bar") + .removeHeader("whereTo"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java index b7a1721..a6b7328 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedRecipientListTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().setExtendedStatisticsEnabled(true); + return context; + } + public void testManageRecipientList() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -36,9 +44,14 @@ public class ManagedRecipientListTest extends ManagementTestSupport { } MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + foo.expectedMessageCount(2); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); - template.sendBodyAndHeader("direct:start", "Hello World", "foo", "mock:foo"); + template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "mock:foo"); + template.sendBodyAndHeader("direct:start", "Bye World", "whereto", "mock:foo"); + template.sendBodyAndHeader("direct:start", "Hi World", "whereto", "mock:bar"); assertMockEndpointsSatisfied(); @@ -65,9 +78,13 @@ public class ManagedRecipientListTest extends ManagementTestSupport { assertEquals("header", lan); String uri = (String) mbeanServer.getAttribute(on, "Expression"); - assertEquals("foo", uri); + assertEquals("whereto", uri); + + TabularData data = (TabularData) mbeanServer.invoke(on, "endpointStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(2, data.size()); @@ -86,7 +103,7 @@ public class ManagedRecipientListTest extends ManagementTestSupport { @Override public void configure() throws Exception { from("direct:start") - .recipientList(header("foo")).id("mysend"); + .recipientList(header("whereto")).id("mysend"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/test/java/org/apache/camel/management/ManagedRoutingSlipTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRoutingSlipTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRoutingSlipTest.java index 7876d25..0ebf549 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRoutingSlipTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRoutingSlipTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedRoutingSlipTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().setExtendedStatisticsEnabled(true); + return context; + } + public void testManageRoutingSlip() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -36,9 +44,12 @@ public class ManagedRoutingSlipTest extends ManagementTestSupport { } MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + foo.expectedMessageCount(2); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); - template.sendBodyAndHeader("direct:start", "Hello World", "whereTo", "direct:foo"); + template.sendBodyAndHeader("direct:start", "Hello World", "whereTo", "direct:foo,direct:foo,direct:bar"); assertMockEndpointsSatisfied(); @@ -64,7 +75,11 @@ public class ManagedRoutingSlipTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Expression"); assertEquals("whereTo", uri); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "endpointStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size()); @@ -87,6 +102,9 @@ public class ManagedRoutingSlipTest extends ManagementTestSupport { from("direct:foo") .to("mock:foo"); + + from("direct:bar") + .to("mock:bar"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/1a576fab/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java index 38c67cb..2c2b895 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().setExtendedStatisticsEnabled(true); + return context; + } + public void testManageSendDynamicProcessor() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -36,9 +44,14 @@ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { } MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + foo.expectedMessageCount(2); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); + template.sendBodyAndHeader("direct:start", "Bye World", "whereto", "foo"); + template.sendBodyAndHeader("direct:start", "Hi World", "whereto", "bar"); assertMockEndpointsSatisfied(); @@ -64,7 +77,11 @@ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { String pattern = (String) mbeanServer.getAttribute(on, "MessageExchangePattern"); assertNull(pattern); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "endpointStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(2, data.size()); @@ -88,6 +105,8 @@ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { .toD("direct:${header.whereto}").id("mysend"); from("direct:foo").to("mock:foo"); + + from("direct:bar").to("mock:bar"); } }; }