CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c744d59f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c744d59f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c744d59f Branch: refs/heads/master Commit: c744d59f69b357305a236c38001c367a82993b51 Parents: c0661ca Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 25 09:42:47 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 25 10:18:17 2015 +0200 ---------------------------------------------------------------------- .../ManagedCircuitBreakerLoadBalancerMBean.java | 5 ++ .../ManagedCircuitBreakerLoadBalancer.java | 50 ++++++++++++++++++ .../mbean/ManagedFailoverLoadBalancer.java | 1 - .../CircuitBreakerLoadBalancer.java | 55 ++++++++++++++++++-- .../ExceptionFailureStatistics.java | 6 ++- .../loadbalancer/FailOverLoadBalancer.java | 3 +- .../ManagedCircuitBreakerLoadBalancerTest.java | 30 +++++++++-- 7 files changed, 137 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java index 87682db..53d2ec9 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java @@ -16,6 +16,8 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; @@ -39,4 +41,7 @@ public interface ManagedCircuitBreakerLoadBalancerMBean extends ManagedProcessor @ManagedOperation(description = "Dumps the state of the load balancer") String dumpState(); + @ManagedOperation(description = "Statistics of the content based router for each exception") + TabularData exceptionStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java index 89c7078..f0a8389 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java @@ -16,14 +16,24 @@ */ package org.apache.camel.management.mbean; +import java.util.Iterator; import java.util.List; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedCircuitBreakerLoadBalancerMBean; import org.apache.camel.model.LoadBalanceDefinition; import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; +import org.apache.camel.processor.loadbalancer.ExceptionFailureStatistics; import org.apache.camel.util.CollectionStringBuffer; +import org.apache.camel.util.ObjectHelper; /** * @version @@ -88,4 +98,44 @@ public class ManagedCircuitBreakerLoadBalancer extends ManagedProcessor implemen public String dumpState() { return processor.dumpState(); } + + @Override + public TabularData exceptionStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.loadbalancerExceptionsTabularType()); + + ExceptionFailureStatistics statistics = processor.getExceptionFailureStatistics(); + + Iterator<Class<?>> it = statistics.getExceptions(); + boolean empty = true; + while (it.hasNext()) { + empty = false; + Class<?> exception = it.next(); + String name = ObjectHelper.name(exception); + long counter = statistics.getFailureCounter(exception); + + CompositeType ct = CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[]{"exception", "failures"}, + new Object[]{name, counter}); + answer.put(data); + } + if (empty) { + // use Exception as a single general + String name = ObjectHelper.name(Exception.class); + long counter = statistics.getFailureCounter(Exception.class); + + CompositeType ct = CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[]{"exception", "failures"}, + new Object[]{name, counter}); + answer.put(data); + } + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java index 469db17..d51972c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java @@ -150,5 +150,4 @@ public class ManagedFailoverLoadBalancer extends ManagedProcessor implements Man } } - } http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java index 39f3efc..abb2758 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java @@ -39,15 +39,19 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T private int threshold; private long halfOpenAfter; private long lastFailure; + + // stateful statistics private AtomicInteger failures = new AtomicInteger(); private AtomicInteger state = new AtomicInteger(STATE_CLOSED); + private final ExceptionFailureStatistics statistics = new ExceptionFailureStatistics(); - public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) { - this.exceptions = exceptions; + public CircuitBreakerLoadBalancer() { + this(null); } - public CircuitBreakerLoadBalancer() { - this.exceptions = null; + public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) { + this.exceptions = exceptions; + statistics.init(exceptions); } public void setHalfOpenAfter(long halfOpenAfter) { @@ -84,21 +88,38 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T return exceptions; } + /** + * Has the given Exchange failed + */ protected boolean hasFailed(Exchange exchange) { + if (exchange == null) { + return false; + } + boolean answer = false; if (exchange.getException() != null) { if (exceptions == null || exceptions.isEmpty()) { + // always failover if no exceptions defined answer = true; } else { for (Class<?> exception : exceptions) { + // will look in exception hierarchy if (exchange.getException(exception) != null) { answer = true; break; } } } + + if (answer) { + // record the failure in the statistics + statistics.onHandledFailure(exchange.getException()); + } } + + log.trace("Failed: {} for exchangeId: {}", answer, exchange.getExchangeId()); + return answer; } @@ -243,6 +264,32 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T return "circuitbreaker"; } + public ExceptionFailureStatistics getExceptionFailureStatistics() { + return statistics; + } + + public void reset() { + // reset state + failures.set(0); + state.set(STATE_CLOSED); + statistics.reset(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // reset state + reset(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + // noop + } + + class CircuitBreakerCallback implements AsyncCallback { private final AsyncCallback callback; private final Exchange exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java index 82ad894..a1e5f5b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java @@ -31,8 +31,10 @@ public class ExceptionFailureStatistics { private final AtomicLong fallbackCounter = new AtomicLong(); public void init(List<Class<?>> exceptions) { - for (Class<?> exception : exceptions) { - counters.put(exception, new AtomicLong()); + if (exceptions != null) { + for (Class<?> exception : exceptions) { + counters.put(exception, new AtomicLong()); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index af2a511..d02df0e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -46,7 +46,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab private boolean sticky; private int maximumFailoverAttempts = -1; - // stateful counter + // stateful statistics private final AtomicInteger counter = new AtomicInteger(-1); private final AtomicInteger lastGoodIndex = new AtomicInteger(-1); private final ExceptionFailureStatistics statistics = new ExceptionFailureStatistics(); @@ -383,6 +383,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // reset state lastGoodIndex.set(-1); counter.set(-1); + statistics.reset(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c744d59f/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java index d8f06e1..f187faf 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java @@ -22,6 +22,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -37,10 +39,24 @@ public class ManagedCircuitBreakerLoadBalancerTest extends ManagementTestSupport return; } - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + getMockEndpoint("mock:foo").whenExchangeReceived(1, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new SQLException("Forced"); + } + }); - template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedBodiesReceived("Hello World", "Bye World"); + + try { + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); + fail("Should fail"); + } catch (Exception e) { + assertIsInstanceOf(SQLException.class, e.getCause()); + assertEquals("Forced", e.getCause().getMessage()); + } + template.sendBodyAndHeader("direct:start", "Bye World", "foo", "123"); assertMockEndpointsSatisfied(); @@ -76,9 +92,13 @@ public class ManagedCircuitBreakerLoadBalancerTest extends ManagementTestSupport assertEquals("closed", cbState); String dump = (String) mbeanServer.invoke(on, "dumpState", null, null); - assertEquals("State closed, failures 0", dump); + assertTrue(dump.startsWith("State closed, failures 0, closed since")); + + TabularData data = (TabularData) mbeanServer.invoke(on, "exceptionStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(2, data.size());