CAMEL-9014: Option to turn on extended JMX statistics for EIPs to track fine 
grained utilization statistics such as which and how often they send to dynamic 
endpoints.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/20ee1734
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/20ee1734
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/20ee1734

Branch: refs/heads/master
Commit: 20ee17344effa78848c352fbf4ec41aaa7077e4d
Parents: c330b8a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Jul 25 13:49:32 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Jul 25 13:59:39 2015 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedWireTapMBean.java   |  6 +++
 .../mbean/ManagedWireTapProcessor.java          | 42 +++++++++++++++++++-
 .../camel/processor/WireTapProcessor.java       |  5 +++
 .../camel/management/ManagedWireTapTest.java    | 23 ++++++++++-
 4 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
index ac60c49..6010c35 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.api.management.mbean;
 
+import javax.management.openmbean.TabularData;
+
 import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
 
 public interface ManagedWireTapMBean extends ManagedProcessorMBean {
 
@@ -32,4 +35,7 @@ public interface ManagedWireTapMBean extends 
ManagedProcessorMBean {
     @ManagedAttribute(description = "Uses a copy of the original exchange")
     Boolean isCopy();
 
+    @ManagedOperation(description = "Statistics of the endpoints which has 
been sent to")
+    TabularData endpointStatistics();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
index 88b72ad..1fad1ac 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -16,12 +16,22 @@
  */
 package org.apache.camel.management.mbean;
 
+import java.util.Map;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
 import org.apache.camel.api.management.mbean.ManagedWireTapMBean;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.WireTapProcessor;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 
 /**
@@ -31,6 +41,7 @@ import org.apache.camel.util.URISupport;
 public class ManagedWireTapProcessor extends ManagedProcessor implements 
ManagedWireTapMBean {
     private final WireTapProcessor processor;
     private String uri;
+    private boolean sanitize;
 
     public ManagedWireTapProcessor(CamelContext context, WireTapProcessor 
processor, ProcessorDefinition<?> definition) {
         super(context, processor, definition);
@@ -39,7 +50,7 @@ public class ManagedWireTapProcessor extends ManagedProcessor 
implements Managed
 
     public void init(ManagementStrategy strategy) {
         super.init(strategy);
-        boolean sanitize = strategy.getManagementAgent().getMask() != null ? 
strategy.getManagementAgent().getMask() : false;
+        sanitize = strategy.getManagementAgent().getMask() != null ? 
strategy.getManagementAgent().getMask() : false;
         if (sanitize) {
             uri = URISupport.sanitizeUri(processor.getUri());
         } else {
@@ -67,4 +78,33 @@ public class ManagedWireTapProcessor extends 
ManagedProcessor implements Managed
         return processor.isCopy();
     }
 
+    @Override
+    public TabularData endpointStatistics() {
+        try {
+            TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.endpointsUtilizationTabularType());
+
+            EndpointUtilizationStatistics stats = 
processor.getEndpointUtilizationStatistics();
+            if (stats != null) {
+                for (Map.Entry<String, Long> entry : 
stats.getStatistics().entrySet()) {
+                    CompositeType ct = 
CamelOpenMBeanTypes.endpointsUtilizationCompositeType();
+                    String url = entry.getKey();
+                    if (sanitize) {
+                        url = URISupport.sanitizeUri(url);
+                    }
+
+                    Long hits = entry.getValue();
+                    if (hits == null) {
+                        hits = 0L;
+                    }
+
+                    CompositeData data = new CompositeDataSupport(ct, new 
String[]{"url", "hits"}, new Object[]{url, hits});
+                    answer.put(data);
+                }
+            }
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index bde5fca..c6df8f5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -34,6 +34,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -103,6 +104,10 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         this.camelContext = camelContext;
     }
 
+    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
+        return dynamicProcessor.getEndpointUtilizationStatistics();
+    }
+
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/20ee1734/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java 
b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
index 0946a99..58b9943 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
@@ -20,6 +20,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.TabularData;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -29,6 +30,13 @@ import org.apache.camel.component.mock.MockEndpoint;
  */
 public class ManagedWireTapTest extends ManagementTestSupport {
 
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getManagementStrategy().setExtendedStatisticsEnabled(true);
+        return context;
+    }
+
     public void testManageWireTap() throws Exception {
         // JMX tests dont work well on AIX CI servers (hangs them)
         if (isPlatform("aix")) {
@@ -36,9 +44,14 @@ public class ManagedWireTapTest extends 
ManagementTestSupport {
         }
 
         MockEndpoint foo = getMockEndpoint("mock:foo");
-        foo.expectedMessageCount(1);
+        foo.expectedMessageCount(2);
+
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMessageCount(1);
 
         template.sendBodyAndHeader("direct:start", "Hello World", "whereto", 
"foo");
+        template.sendBodyAndHeader("direct:start", "Bye World", "whereto", 
"foo");
+        template.sendBodyAndHeader("direct:start", "Hi World", "whereto", 
"bar");
 
         assertMockEndpointsSatisfied();
 
@@ -61,7 +74,11 @@ public class ManagedWireTapTest extends 
ManagementTestSupport {
         String uri = (String) mbeanServer.getAttribute(on, "Uri");
         assertEquals("direct:${header.whereto}", uri);
 
-        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
+        TabularData data = (TabularData) mbeanServer.invoke(on, 
"endpointStatistics", null, null);
+        assertNotNull(data);
+        assertEquals(2, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
         assertNotNull(data);
         assertEquals(3, data.size());
 
@@ -85,6 +102,8 @@ public class ManagedWireTapTest extends 
ManagementTestSupport {
                     .wireTap("direct:${header.whereto}").id("mysend");
 
                 from("direct:foo").to("mock:foo");
+
+                from("direct:bar").to("mock:bar");
             }
         };
     }

Reply via email to