CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c0661ca5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c0661ca5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c0661ca5 Branch: refs/heads/master Commit: c0661ca5dc9329c27bf6be732c92a06451fbe3c8 Parents: 487d7d7 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 25 09:27:19 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 25 10:18:17 2015 +0200 ---------------------------------------------------------------------- .../management/mbean/CamelOpenMBeanTypes.java | 10 +++ .../mbean/ManagedFailoverLoadBalancerMBean.java | 6 ++ .../mbean/ManagedFailoverLoadBalancer.java | 49 ++++++++++++++ .../ExceptionFailureStatistics.java | 70 ++++++++++++++++++++ .../loadbalancer/FailOverLoadBalancer.java | 12 ++++ .../ManagedFailoverLoadBalancerTest.java | 21 ++++-- 6 files changed, 164 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index c9894a1..9b45573 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -158,4 +158,14 @@ public final class CamelOpenMBeanTypes { new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.LONG}); } + public static TabularType loadbalancerExceptionsTabularType() throws OpenDataException { + CompositeType ct = loadbalancerExceptionsCompositeType(); + return new TabularType("exception", "Exception statistics", ct, new String[]{"exception"}); + } + + public static CompositeType loadbalancerExceptionsCompositeType() throws OpenDataException { + return new CompositeType("exceptions", "Exceptions", new String[]{"exception", "failures"}, + new String[]{"Exception", "Failures"}, + new OpenType[]{SimpleType.STRING, SimpleType.LONG}); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java index fec3f4e..476b7c7 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java @@ -16,7 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedFailoverLoadBalancerMBean extends ManagedProcessorMBean { @@ -38,4 +41,7 @@ public interface ManagedFailoverLoadBalancerMBean extends ManagedProcessorMBean @ManagedAttribute(description = "Processor id of the last known good processor that succeed processing the exchange") String getLastGoodProcessorId(); + @ManagedOperation(description = "Statistics of the content based router for each exception") + TabularData exceptionStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java index 34c03c7..469db17 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java @@ -16,15 +16,24 @@ */ package org.apache.camel.management.mbean; +import java.util.Iterator; import java.util.List; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedFailoverLoadBalancerMBean; import org.apache.camel.model.LoadBalanceDefinition; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.loadbalancer.ExceptionFailureStatistics; import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer; import org.apache.camel.util.CollectionStringBuffer; +import org.apache.camel.util.ObjectHelper; /** * @version @@ -102,4 +111,44 @@ public class ManagedFailoverLoadBalancer extends ManagedProcessor implements Man return null; } + @Override + public TabularData exceptionStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.loadbalancerExceptionsTabularType()); + + ExceptionFailureStatistics statistics = processor.getExceptionFailureStatistics(); + + Iterator<Class<?>> it = statistics.getExceptions(); + boolean empty = true; + while (it.hasNext()) { + empty = false; + Class<?> exception = it.next(); + String name = ObjectHelper.name(exception); + long counter = statistics.getFailureCounter(exception); + + CompositeType ct = CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[]{"exception", "failures"}, + new Object[]{name, counter}); + answer.put(data); + } + if (empty) { + // use Exception as a single general + String name = ObjectHelper.name(Exception.class); + long counter = statistics.getFailureCounter(Exception.class); + + CompositeType ct = CamelOpenMBeanTypes.loadbalancerExceptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[]{"exception", "failures"}, + new Object[]{name, counter}); + answer.put(data); + } + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + } http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java new file mode 100644 index 0000000..82ad894 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ExceptionFailureStatistics.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.loadbalancer; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Statistics about exception failures for load balancers that reacts on exceptions + */ +public class ExceptionFailureStatistics { + + private final Map<Class<?>, AtomicLong> counters = new HashMap<Class<?>, AtomicLong>(); + private final AtomicLong fallbackCounter = new AtomicLong(); + + public void init(List<Class<?>> exceptions) { + for (Class<?> exception : exceptions) { + counters.put(exception, new AtomicLong()); + } + } + + public Iterator<Class<?>> getExceptions() { + return counters.keySet().iterator(); + } + + public long getFailureCounter(Class<?> exception) { + AtomicLong counter = counters.get(exception); + if (counter != null) { + return counter.get(); + } else { + return fallbackCounter.get(); + } + } + + public void onHandledFailure(Exception exception) { + Class<?> clazz = exception.getClass(); + + AtomicLong counter = counters.get(clazz); + if (counter != null) { + counter.incrementAndGet(); + } else { + fallbackCounter.incrementAndGet(); + } + } + + public void reset() { + for (AtomicLong counter : counters.values()) { + counter.set(0); + } + fallbackCounter.set(0); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 5eb5d8b..af2a511 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -49,6 +49,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // stateful counter private final AtomicInteger counter = new AtomicInteger(-1); private final AtomicInteger lastGoodIndex = new AtomicInteger(-1); + private final ExceptionFailureStatistics statistics = new ExceptionFailureStatistics(); public FailOverLoadBalancer() { this.exceptions = null; @@ -63,6 +64,8 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab throw new IllegalArgumentException("Class is not an instance of Throwable: " + type); } } + + statistics.init(exceptions); } @Override @@ -133,6 +136,11 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab } } } + + if (answer) { + // record the failure in the statistics + statistics.onHandledFailure(exchange.getException()); + } } log.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId()); @@ -367,6 +375,10 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab return "failover"; } + public ExceptionFailureStatistics getExceptionFailureStatistics() { + return statistics; + } + public void reset() { // reset state lastGoodIndex.set(-1); http://git-wip-us.apache.org/repos/asf/camel/blob/c0661ca5/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java index 5720afd..f13d08c 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java @@ -22,6 +22,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -37,8 +39,15 @@ public class ManagedFailoverLoadBalancerTest extends ManagementTestSupport { return; } - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IOException("Forced"); + } + }); + + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedMessageCount(1); template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); @@ -76,9 +85,13 @@ public class ManagedFailoverLoadBalancerTest extends ManagementTestSupport { assertEquals("java.io.IOException,java.sql.SQLException", exceptions); String id = (String) mbeanServer.getAttribute(on, "LastGoodProcessorId"); - assertEquals("foo", id); + assertEquals("bar", id); + + TabularData data = (TabularData) mbeanServer.invoke(on, "exceptionStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size());