CAMEL-8526: Add more EIP as specialized mbeans

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c744d59f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c744d59f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c744d59f

Branch: refs/heads/master
Commit: c744d59f69b357305a236c38001c367a82993b51
Parents: c0661ca
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Jul 25 09:42:47 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Jul 25 10:18:17 2015 +0200

----------------------------------------------------------------------
 .../ManagedCircuitBreakerLoadBalancerMBean.java |  5 ++
 .../ManagedCircuitBreakerLoadBalancer.java      | 50 ++++++++++++++++++
 .../mbean/ManagedFailoverLoadBalancer.java      |  1 -
 .../CircuitBreakerLoadBalancer.java             | 55 ++++++++++++++++++--
 .../ExceptionFailureStatistics.java             |  6 ++-
 .../loadbalancer/FailOverLoadBalancer.java      |  3 +-
 .../ManagedCircuitBreakerLoadBalancerTest.java  | 30 +++++++++--
 7 files changed, 137 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
index 87682db..53d2ec9 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.api.management.mbean;
 
+import javax.management.openmbean.TabularData;
+
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 
@@ -39,4 +41,7 @@ public interface ManagedCircuitBreakerLoadBalancerMBean 
extends ManagedProcessor
     @ManagedOperation(description = "Dumps the state of the load balancer")
     String dumpState();
 
+    @ManagedOperation(description = "Statistics of the content based router 
for each exception")
+    TabularData exceptionStatistics();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
index 89c7078..f0a8389 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
@@ -16,14 +16,24 @@
  */
 package org.apache.camel.management.mbean;
 
+import java.util.Iterator;
 import java.util.List;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
 import 
org.apache.camel.api.management.mbean.ManagedCircuitBreakerLoadBalancerMBean;
 import org.apache.camel.model.LoadBalanceDefinition;
 import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.processor.loadbalancer.ExceptionFailureStatistics;
 import org.apache.camel.util.CollectionStringBuffer;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * @version 
@@ -88,4 +98,44 @@ public class ManagedCircuitBreakerLoadBalancer extends 
ManagedProcessor implemen
     public String dumpState() {
         return processor.dumpState();
     }
+
+    @Override
+    public TabularData exceptionStatistics() {
+        try {
+            TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.loadbalancerExceptionsTabularType());
+
+            ExceptionFailureStatistics statistics = 
processor.getExceptionFailureStatistics();
+
+            Iterator<Class<?>> it = statistics.getExceptions();
+            boolean empty = true;
+            while (it.hasNext()) {
+                empty = false;
+                Class<?> exception = it.next();
+                String name = ObjectHelper.name(exception);
+                long counter = statistics.getFailureCounter(exception);
+
+                CompositeType ct = 
CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType();
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"exception", "failures"},
+                        new Object[]{name, counter});
+                answer.put(data);
+            }
+            if (empty) {
+                // use Exception as a single general
+                String name = ObjectHelper.name(Exception.class);
+                long counter = statistics.getFailureCounter(Exception.class);
+
+                CompositeType ct = 
CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType();
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"exception", "failures"},
+                        new Object[]{name, counter});
+                answer.put(data);
+            }
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
index 469db17..d51972c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java
@@ -150,5 +150,4 @@ public class ManagedFailoverLoadBalancer extends 
ManagedProcessor implements Man
         }
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
index 39f3efc..abb2758 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
@@ -39,15 +39,19 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
     private int threshold;
     private long halfOpenAfter;
     private long lastFailure;
+
+    // stateful statistics
     private AtomicInteger failures = new AtomicInteger();
     private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
+    private final ExceptionFailureStatistics statistics = new 
ExceptionFailureStatistics();
 
-    public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) {
-        this.exceptions = exceptions;
+    public CircuitBreakerLoadBalancer() {
+        this(null);
     }
 
-    public CircuitBreakerLoadBalancer() {
-        this.exceptions = null;
+    public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) {
+        this.exceptions = exceptions;
+        statistics.init(exceptions);
     }
 
     public void setHalfOpenAfter(long halfOpenAfter) {
@@ -84,21 +88,38 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
         return exceptions;
     }
 
+    /**
+     * Has the given Exchange failed
+     */
     protected boolean hasFailed(Exchange exchange) {
+        if (exchange == null) {
+            return false;
+        }
+
         boolean answer = false;
 
         if (exchange.getException() != null) {
             if (exceptions == null || exceptions.isEmpty()) {
+                // always failover if no exceptions defined
                 answer = true;
             } else {
                 for (Class<?> exception : exceptions) {
+                    // will look in exception hierarchy
                     if (exchange.getException(exception) != null) {
                         answer = true;
                         break;
                     }
                 }
             }
+
+            if (answer) {
+                // record the failure in the statistics
+                statistics.onHandledFailure(exchange.getException());
+            }
         }
+
+        log.trace("Failed: {} for exchangeId: {}", answer, 
exchange.getExchangeId());
+
         return answer;
     }
 
@@ -243,6 +264,32 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
         return "circuitbreaker";
     }
 
+    public ExceptionFailureStatistics getExceptionFailureStatistics() {
+        return statistics;
+    }
+
+    public void reset() {
+        // reset state
+        failures.set(0);
+        state.set(STATE_CLOSED);
+        statistics.reset();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // reset state
+        reset();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        // noop
+    }
+
+
     class CircuitBreakerCallback implements AsyncCallback {
         private final AsyncCallback callback;
         private final Exchange exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
index 82ad894..a1e5f5b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java
@@ -31,8 +31,10 @@ public class ExceptionFailureStatistics {
     private final AtomicLong fallbackCounter = new AtomicLong();
 
     public void init(List<Class<?>> exceptions) {
-        for (Class<?> exception : exceptions) {
-            counters.put(exception, new AtomicLong());
+        if (exceptions != null) {
+            for (Class<?> exception : exceptions) {
+                counters.put(exception, new AtomicLong());
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index af2a511..d02df0e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -46,7 +46,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport 
implements Traceab
     private boolean sticky;
     private int maximumFailoverAttempts = -1;
 
-    // stateful counter
+    // stateful statistics
     private final AtomicInteger counter = new AtomicInteger(-1);
     private final AtomicInteger lastGoodIndex = new AtomicInteger(-1);
     private final ExceptionFailureStatistics statistics = new 
ExceptionFailureStatistics();
@@ -383,6 +383,7 @@ public class FailOverLoadBalancer extends 
LoadBalancerSupport implements Traceab
         // reset state
         lastGoodIndex.set(-1);
         counter.set(-1);
+        statistics.reset();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
index d8f06e1..f187faf 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
@@ -22,6 +22,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.TabularData;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -37,10 +39,24 @@ public class ManagedCircuitBreakerLoadBalancerTest extends 
ManagementTestSupport
             return;
         }
 
-        MockEndpoint foo = getMockEndpoint("mock:foo");
-        foo.expectedMessageCount(1);
+        getMockEndpoint("mock:foo").whenExchangeReceived(1, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new SQLException("Forced");
+            }
+        });
 
-        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"123");
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+        foo.expectedBodiesReceived("Hello World", "Bye World");
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"123");
+            fail("Should fail");
+        } catch (Exception e) {
+            assertIsInstanceOf(SQLException.class, e.getCause());
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+        template.sendBodyAndHeader("direct:start", "Bye World", "foo", "123");
 
         assertMockEndpointsSatisfied();
 
@@ -76,9 +92,13 @@ public class ManagedCircuitBreakerLoadBalancerTest extends 
ManagementTestSupport
         assertEquals("closed", cbState);
 
         String dump = (String) mbeanServer.invoke(on, "dumpState", null, null);
-        assertEquals("State closed, failures 0", dump);
+        assertTrue(dump.startsWith("State closed, failures 0, closed since"));
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, 
"exceptionStatistics", null, null);
+        assertNotNull(data);
+        assertEquals(2, data.size());
 
-        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
+        data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
         assertNotNull(data);
         assertEquals(2, data.size());
 

Reply via email to