CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine grained utilization statistics such as which and how often they send to dynamic endpoints.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/20ee1734 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/20ee1734 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/20ee1734 Branch: refs/heads/master Commit: 20ee17344effa78848c352fbf4ec41aaa7077e4d Parents: c330b8a Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 25 13:49:32 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 25 13:59:39 2015 +0200 ---------------------------------------------------------------------- .../management/mbean/ManagedWireTapMBean.java | 6 +++ .../mbean/ManagedWireTapProcessor.java | 42 +++++++++++++++++++- .../camel/processor/WireTapProcessor.java | 5 +++ .../camel/management/ManagedWireTapTest.java | 23 ++++++++++- 4 files changed, 73 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java index ac60c49..6010c35 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedWireTapMBean extends ManagedProcessorMBean { @@ -32,4 +35,7 @@ public interface ManagedWireTapMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Uses a copy of the original exchange") Boolean isCopy(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") + TabularData endpointStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java index 88b72ad..1fad1ac 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java @@ -16,12 +16,22 @@ */ package org.apache.camel.management.mbean; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedWireTapMBean; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.WireTapProcessor; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; /** @@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport; public class ManagedWireTapProcessor extends ManagedProcessor implements ManagedWireTapMBean { private final WireTapProcessor processor; private String uri; + private boolean sanitize; public ManagedWireTapProcessor(CamelContext context, WireTapProcessor processor, ProcessorDefinition<?> definition) { super(context, processor, definition); @@ -39,7 +50,7 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed public void init(ManagementStrategy strategy) { super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; if (sanitize) { uri = URISupport.sanitizeUri(processor.getUri()); } else { @@ -67,4 +78,33 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed return processor.isCopy(); } + @Override + public TabularData endpointStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType()); + + EndpointUtilizationStatistics stats = processor.getEndpointUtilizationStatistics(); + if (stats != null) { + for (Map.Entry<String, Long> entry : stats.getStatistics().entrySet()) { + CompositeType ct = CamelOpenMBeanTypes.endpointsUtilizationCompositeType(); + String url = entry.getKey(); + if (sanitize) { + url = URISupport.sanitizeUri(url); + } + + Long hits = entry.getValue(); + if (hits == null) { + hits = 0L; + } + + CompositeData data = new CompositeDataSupport(ct, new String[]{"url", "hits"}, new Object[]{url, hits}); + answer.put(data); + } + } + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index bde5fca..c6df8f5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -34,6 +34,7 @@ import org.apache.camel.Processor; import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -103,6 +104,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, this.camelContext = camelContext; } + public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { + return dynamicProcessor.getEndpointUtilizationStatistics(); + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java index 0946a99..58b9943 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedWireTapTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().setExtendedStatisticsEnabled(true); + return context; + } + public void testManageWireTap() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { @@ -36,9 +44,14 @@ public class ManagedWireTapTest extends ManagementTestSupport { } MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + foo.expectedMessageCount(2); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); + template.sendBodyAndHeader("direct:start", "Bye World", "whereto", "foo"); + template.sendBodyAndHeader("direct:start", "Hi World", "whereto", "bar"); assertMockEndpointsSatisfied(); @@ -61,7 +74,11 @@ public class ManagedWireTapTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Uri"); assertEquals("direct:${header.whereto}", uri); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "endpointStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size()); @@ -85,6 +102,8 @@ public class ManagedWireTapTest extends ManagementTestSupport { .wireTap("direct:${header.whereto}").id("mysend"); from("direct:foo").to("mock:foo"); + + from("direct:bar").to("mock:bar"); } }; }