CAMEL-8526: Add more EIP as specialized mbeans

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c0661ca5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c0661ca5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c0661ca5

Branch: refs/heads/master
Commit: c0661ca5dc9329c27bf6be732c92a06451fbe3c8
Parents: 487d7d7
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Jul 25 09:27:19 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Jul 25 10:18:17 2015 +0200

----------------------------------------------------------------------
 .../management/mbean/CamelOpenMBeanTypes.java   | 10 +++
 .../mbean/ManagedFailoverLoadBalancerMBean.java |  6 ++
 .../mbean/ManagedFailoverLoadBalancer.java      | 49 ++++++++++++++
 .../ExceptionFailureStatistics.java             | 70 ++++++++++++++++++++
 .../loadbalancer/FailOverLoadBalancer.java      | 12 ++++
 .../ManagedFailoverLoadBalancerTest.java        | 21 ++++--
 6 files changed, 164 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index c9894a1..9b45573 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -158,4 +158,14 @@ public final class CamelOpenMBeanTypes {
                 new OpenType[]{SimpleType.STRING, SimpleType.STRING, 
SimpleType.LONG});
     }
 
+    public static TabularType loadbalancerExceptionsTabularType() throws 
OpenDataException {
+        CompositeType ct = loadbalancerExceptionsCompositeType();
+        return new TabularType("exception", "Exception statistics", ct, new 
String[]{"exception"});
+    }
+
+    public static CompositeType loadbalancerExceptionsCompositeType() throws 
OpenDataException {
+        return new CompositeType("exceptions", "Exceptions", new 
String[]{"exception", "failures"},
+                new String[]{"Exception", "Failures"},
+                new OpenType[]{SimpleType.STRING, SimpleType.LONG});
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java
index fec3f4e..476b7c7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.api.management.mbean;
 
+import javax.management.openmbean.TabularData;
+
 import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
 
 public interface ManagedFailoverLoadBalancerMBean extends 
ManagedProcessorMBean {
 
@@ -38,4 +41,7 @@ public interface ManagedFailoverLoadBalancerMBean extends 
ManagedProcessorMBean
     @ManagedAttribute(description = "Processor id of the last known good 
processor that succeed processing the exchange")
     String getLastGoodProcessorId();
 
+    @ManagedOperation(description = "Statistics of the content based router 
for each exception")
+    TabularData exceptionStatistics();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
index 34c03c7..469db17 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
@@ -16,15 +16,24 @@
  */
 package org.apache.camel.management.mbean;
 
+import java.util.Iterator;
 import java.util.List;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
 import org.apache.camel.api.management.mbean.ManagedFailoverLoadBalancerMBean;
 import org.apache.camel.model.LoadBalanceDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.loadbalancer.ExceptionFailureStatistics;
 import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
 import org.apache.camel.util.CollectionStringBuffer;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * @version 
@@ -102,4 +111,44 @@ public class ManagedFailoverLoadBalancer extends 
ManagedProcessor implements Man
         return null;
     }
 
+    @Override
+    public TabularData exceptionStatistics() {
+        try {
+            TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.loadbalancerExceptionsTabularType());
+
+            ExceptionFailureStatistics statistics = 
processor.getExceptionFailureStatistics();
+
+            Iterator<Class<?>> it = statistics.getExceptions();
+            boolean empty = true;
+            while (it.hasNext()) {
+                empty = false;
+                Class<?> exception = it.next();
+                String name = ObjectHelper.name(exception);
+                long counter = statistics.getFailureCounter(exception);
+
+                CompositeType ct = 
CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType();
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"exception", "failures"},
+                        new Object[]{name, counter});
+                answer.put(data);
+            }
+            if (empty) {
+                // use Exception as a single general
+                String name = ObjectHelper.name(Exception.class);
+                long counter = statistics.getFailureCounter(Exception.class);
+
+                CompositeType ct = 
CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType();
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"exception", "failures"},
+                        new Object[]{name, counter});
+                answer.put(data);
+            }
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
new file mode 100644
index 0000000..82ad894
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Statistics about exception failures for load balancers that reacts on 
exceptions
+ */
+public class ExceptionFailureStatistics {
+
+    private final Map<Class<?>, AtomicLong> counters = new HashMap<Class<?>, 
AtomicLong>();
+    private final AtomicLong fallbackCounter = new AtomicLong();
+
+    public void init(List<Class<?>> exceptions) {
+        for (Class<?> exception : exceptions) {
+            counters.put(exception, new AtomicLong());
+        }
+    }
+
+    public Iterator<Class<?>> getExceptions() {
+        return counters.keySet().iterator();
+    }
+
+    public long getFailureCounter(Class<?> exception) {
+        AtomicLong counter = counters.get(exception);
+        if (counter != null) {
+            return counter.get();
+        } else {
+            return fallbackCounter.get();
+        }
+    }
+
+    public void onHandledFailure(Exception exception) {
+        Class<?> clazz = exception.getClass();
+
+        AtomicLong counter = counters.get(clazz);
+        if (counter != null) {
+            counter.incrementAndGet();
+        } else {
+            fallbackCounter.incrementAndGet();
+        }
+    }
+
+    public void reset() {
+        for (AtomicLong counter : counters.values()) {
+            counter.set(0);
+        }
+        fallbackCounter.set(0);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 5eb5d8b..af2a511 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -49,6 +49,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport 
implements Traceab
     // stateful counter
     private final AtomicInteger counter = new AtomicInteger(-1);
     private final AtomicInteger lastGoodIndex = new AtomicInteger(-1);
+    private final ExceptionFailureStatistics statistics = new 
ExceptionFailureStatistics();
 
     public FailOverLoadBalancer() {
         this.exceptions = null;
@@ -63,6 +64,8 @@ public class FailOverLoadBalancer extends LoadBalancerSupport 
implements Traceab
                 throw new IllegalArgumentException("Class is not an instance 
of Throwable: " + type);
             }
         }
+
+        statistics.init(exceptions);
     }
 
     @Override
@@ -133,6 +136,11 @@ public class FailOverLoadBalancer extends 
LoadBalancerSupport implements Traceab
                     }
                 }
             }
+
+            if (answer) {
+                // record the failure in the statistics
+                statistics.onHandledFailure(exchange.getException());
+            }
         }
 
         log.trace("Should failover: {} for exchangeId: {}", answer, 
exchange.getExchangeId());
@@ -367,6 +375,10 @@ public class FailOverLoadBalancer extends 
LoadBalancerSupport implements Traceab
         return "failover";
     }
 
+    public ExceptionFailureStatistics getExceptionFailureStatistics() {
+        return statistics;
+    }
+
     public void reset() {
         // reset state
         lastGoodIndex.set(-1);

http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java
index 5720afd..f13d08c 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java
@@ -22,6 +22,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.TabularData;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -37,8 +39,15 @@ public class ManagedFailoverLoadBalancerTest extends 
ManagementTestSupport {
             return;
         }
 
-        MockEndpoint foo = getMockEndpoint("mock:foo");
-        foo.expectedMessageCount(1);
+        getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IOException("Forced");
+            }
+        });
+
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMessageCount(1);
 
         template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"123");
 
@@ -76,9 +85,13 @@ public class ManagedFailoverLoadBalancerTest extends 
ManagementTestSupport {
         assertEquals("java.io.IOException,java.sql.SQLException", exceptions);
 
         String id = (String) mbeanServer.getAttribute(on, 
"LastGoodProcessorId");
-        assertEquals("foo", id);
+        assertEquals("bar", id);
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, 
"exceptionStatistics", null, null);
+        assertNotNull(data);
+        assertEquals(2, data.size());
 
-        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
+        data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
         assertNotNull(data);
         assertEquals(3, data.size());
 

Reply via email to