CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5122afb6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5122afb6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5122afb6 Branch: refs/heads/master Commit: 5122afb6c8c0ab8b95787364c1f72a3b5ecc2645 Parents: b40d713 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 24 18:18:31 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 24 18:57:34 2015 +0200 ---------------------------------------------------------------------- .../management/mbean/CamelOpenMBeanTypes.java | 11 ++++ .../management/mbean/ManagedChoiceMBean.java | 7 +++ .../camel/management/mbean/ManagedChoice.java | 62 ++++++++++++++++++++ .../apache/camel/processor/ChoiceProcessor.java | 23 +++++++- .../apache/camel/processor/FilterProcessor.java | 23 ++++++-- .../camel/management/ManagedChoiceTest.java | 12 +++- 6 files changed, 127 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index de84321..c9894a1 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -147,4 +147,15 @@ public final class CamelOpenMBeanTypes { new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING}); } + public static TabularType choiceTabularType() throws OpenDataException { + CompositeType ct = choiceCompositeType(); + return new TabularType("choice", "Choice statistics", ct, new String[]{"predicate"}); + } + + public static CompositeType choiceCompositeType() throws OpenDataException { + return new CompositeType("predicates", "Predicates", new String[]{"predicate", "language", "matches"}, + new String[]{"Predicate", "Language", "Matches"}, + new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.LONG}); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedChoiceMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedChoiceMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedChoiceMBean.java index 64ae27b..32c2bf8 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedChoiceMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedChoiceMBean.java @@ -16,6 +16,13 @@ */ package org.apache.camel.api.management.mbean; +import javax.management.openmbean.TabularData; + +import org.apache.camel.api.management.ManagedOperation; + public interface ManagedChoiceMBean extends ManagedProcessorMBean { + @ManagedOperation(description = "Statistics of the content based router for each predicate") + TabularData choiceStatistics(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java index d4a8d67..09aaf1a 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java @@ -16,11 +16,23 @@ */ package org.apache.camel.management.mbean; +import java.util.List; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; import org.apache.camel.api.management.mbean.ManagedChoiceMBean; +import org.apache.camel.model.ChoiceDefinition; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.WhenDefinition; import org.apache.camel.processor.ChoiceProcessor; +import org.apache.camel.processor.FilterProcessor; +import org.apache.camel.util.ObjectHelper; /** * @version @@ -34,4 +46,54 @@ public class ManagedChoice extends ManagedProcessor implements ManagedChoiceMBea this.processor = processor; } + @Override + public ChoiceDefinition getDefinition() { + return (ChoiceDefinition) super.getDefinition(); + } + + @Override + public synchronized void reset() { + processor.reset(); + super.reset(); + } + + @Override + public TabularData choiceStatistics() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.choiceTabularType()); + + List<WhenDefinition> whens = getDefinition().getWhenClauses(); + List<FilterProcessor> filters = processor.getFilters(); + + for (int i = 0; i < filters.size(); i++) { + WhenDefinition when = whens.get(i); + FilterProcessor filter = filters.get(i); + + CompositeType ct = CamelOpenMBeanTypes.choiceCompositeType(); + String predicate = when.getExpression().getExpression(); + String language = when.getExpression().getLanguage(); + Long matches = filter.getFilteredCount(); + + CompositeData data = new CompositeDataSupport(ct, + new String[]{"predicate", "language", "matches"}, + new Object[]{predicate, language, matches}); + answer.put(data); + } + if (getDefinition().getOtherwise() != null) { + CompositeType ct = CamelOpenMBeanTypes.choiceCompositeType(); + String predicate = "otherwise"; + String language = ""; + Long matches = processor.getNotFilteredCount(); + + CompositeData data = new CompositeDataSupport(ct, + new String[]{"predicate", "language", "matches"}, + new Object[]{predicate, language, matches}); + answer.put(data); + } + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index c2a02b1..5117168 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -48,6 +48,7 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N private String id; private final List<FilterProcessor> filters; private final Processor otherwise; + private transient long notFiltered; public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise) { this.filters = filters; @@ -89,8 +90,7 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N if (processor instanceof FilterProcessor) { FilterProcessor filter = (FilterProcessor) processor; try { - matches = filter.getPredicate().matches(exchange); - exchange.setProperty(Exchange.FILTER_MATCHED, matches); + matches = filter.matches(exchange); // as we have pre evaluated the predicate then use its processor directly when routing processor = filter.getProcessor(); } catch (Throwable e) { @@ -98,6 +98,7 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N } } else { // its the otherwise processor, so its a match + notFiltered++; matches = true; } @@ -154,6 +155,23 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N return otherwise; } + /** + * Gets the number of Exchanges that did not match any predicate and are routed using otherwise + */ + public long getNotFilteredCount() { + return notFiltered; + } + + /** + * Reset counters. + */ + public void reset() { + for (FilterProcessor filter : getFilters()) { + filter.reset(); + } + notFiltered = 0; + } + public List<Processor> next() { if (!hasNext()) { return null; @@ -187,4 +205,5 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N protected void doStop() throws Exception { ServiceHelper.stopServices(otherwise, filters); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java index 31bdf93..289c157 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java @@ -46,12 +46,25 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable @Override public boolean process(Exchange exchange, AsyncCallback callback) { boolean matches = false; + try { - matches = predicate.matches(exchange); - } catch (Throwable e) { + matches = matches(exchange); + } catch (Exception e) { exchange.setException(e); } + if (matches) { + filtered++; + return processor.process(exchange, callback); + } else { + callback.done(true); + return true; + } + } + + public boolean matches(Exchange exchange) { + boolean matches = predicate.matches(exchange); + LOG.debug("Filter matches: {} for exchange: {}", matches, exchange); // set property whether the filter matches or not @@ -59,11 +72,9 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable if (matches) { filtered++; - return processor.process(exchange, callback); - } else { - callback.done(true); - return true; } + + return matches; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/5122afb6/camel-core/src/test/java/org/apache/camel/management/ManagedChoiceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedChoiceTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedChoiceTest.java index 62cdb3b..b0ad788 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedChoiceTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedChoiceTest.java @@ -35,10 +35,12 @@ public class ManagedChoiceTest extends ManagementTestSupport { return; } - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); + getMockEndpoint("mock:foo").expectedMessageCount(2); + getMockEndpoint("mock:bar").expectedMessageCount(1); template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); + template.sendBodyAndHeader("direct:start", "Bye World", "foo", "456"); + template.sendBodyAndHeader("direct:start", "Hi World", "bar", "789"); assertMockEndpointsSatisfied(); @@ -58,7 +60,11 @@ public class ManagedChoiceTest extends ManagementTestSupport { String state = (String) mbeanServer.getAttribute(on, "State"); assertEquals(ServiceStatus.Started.name(), state); - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + TabularData data = (TabularData) mbeanServer.invoke(on, "choiceStatistics", null, null); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size());