CAMEL-11563: Add predicate option to @Consume so the bean is only called if its evaluated to true
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9041c4de Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9041c4de Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9041c4de Branch: refs/heads/master Commit: 9041c4de25c599e5f1512b0744adbb9e7ef433a5 Parents: 49c3c02 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 23 16:02:58 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 23 16:04:12 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/CamelPostProcessorHelper.java | 16 +++- .../camel/impl/SubscribeMethodProcessor.java | 6 +- ...PostProcessorHelperConsumePredicateTest.java | 20 ++--- ...ProcessorHelperSedaConsumePredicateTest.java | 85 ++++++++++++++++++++ 4 files changed, 111 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java index 7296b0b..aa7e15e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java @@ -29,6 +29,7 @@ import org.apache.camel.ConsumerTemplate; import org.apache.camel.Endpoint; import org.apache.camel.FluentProducerTemplate; import org.apache.camel.IsSingleton; +import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.NoSuchBeanException; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; @@ -102,9 +103,14 @@ public class CamelPostProcessorHelper implements CamelContextAware { String injectionPointName = method.getName(); Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true); if (endpoint != null) { + boolean multipleConsumer = false; + if (endpoint instanceof MultipleConsumersSupport) { + multipleConsumer = ((MultipleConsumersSupport) endpoint).isMultipleConsumersSupported(); + } try { SubscribeMethodProcessor processor = getConsumerProcessor(endpoint); - if (processor == null) { + // if multiple consumer then create a new consumer per subscribed method + if (multipleConsumer || processor == null) { // create new processor and new consumer which happens the first time processor = new SubscribeMethodProcessor(endpoint); // make sure processor is registered in registry so we can reuse it (eg we can look it up) @@ -113,10 +119,14 @@ public class CamelPostProcessorHelper implements CamelContextAware { Consumer consumer = endpoint.createConsumer(processor); startService(consumer, endpoint.getCamelContext(), bean, beanName); } else { - // add to existing processor + // add method to existing processor processor.addMethod(bean, method, endpoint, predicate); } - LOG.debug("Subscribed method: {} to consume from endpoint: {} with predicate: {}", method, endpoint, predicate); + if (predicate != null) { + LOG.debug("Subscribed method: {} to consume from endpoint: {} with predicate: {}", method, endpoint, predicate); + } else { + LOG.debug("Subscribed method: {} to consume from endpoint: {}", method, endpoint); + } } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java index 97d953f..bf39a7e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java index 3d9f99d..7ae0ffb 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java @@ -39,11 +39,11 @@ public class CamelPostProcessorHelperConsumePredicateTest extends ContextTestSup getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39"); getMockEndpoint("mock:high").expectedBodiesReceived("219", "112"); - template.sendBody("seda:foo", "17"); - template.sendBody("seda:foo", "219"); - template.sendBody("seda:foo", "89"); - template.sendBody("seda:foo", "112"); - template.sendBody("seda:foo", "39"); + template.sendBody("direct:foo", "17"); + template.sendBody("direct:foo", "219"); + template.sendBody("direct:foo", "89"); + template.sendBody("direct:foo", "112"); + template.sendBody("direct:foo", "39"); assertMockEndpointsSatisfied(); } @@ -61,22 +61,22 @@ public class CamelPostProcessorHelperConsumePredicateTest extends ContextTestSup getMockEndpoint("mock:low").expectedBodiesReceived("17"); getMockEndpoint("mock:high").expectedBodiesReceived("112"); - template.sendBody("seda:foo", "17"); + template.sendBody("direct:foo", "17"); // should be dropped as it does not match any predicates - template.sendBody("seda:foo", "-1"); - template.sendBody("seda:foo", "112"); + template.sendBody("direct:foo", "-1"); + template.sendBody("direct:foo", "112"); assertMockEndpointsSatisfied(); } public class MyConsumeBean { - @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100") + @Consume(uri = "direct:foo", predicate = "${body} >= 0 && ${body} < 100") public void low(String body) { template.sendBody("mock:low", body); } - @Consume(uri = "seda:foo", predicate = "${body} >= 100") + @Consume(uri = "direct:foo", predicate = "${body} >= 100") public void high(String body) { template.sendBody("mock:high", body); } http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java new file mode 100644 index 0000000..ef6e9e1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.lang.reflect.Method; + +import org.apache.camel.Consume; +import org.apache.camel.ContextTestSupport; + +/** + * @version + */ +public class CamelPostProcessorHelperSedaConsumePredicateTest extends ContextTestSupport { + + public void testConsumePredicate() throws Exception { + CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context); + + MyConsumeBean my = new MyConsumeBean(); + + Method method = my.getClass().getMethod("low", String.class); + helper.consumerInjection(method, my, "foo"); + method = my.getClass().getMethod("high", String.class); + helper.consumerInjection(method, my, "foo"); + + getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39"); + getMockEndpoint("mock:high").expectedBodiesReceived("219", "112"); + + template.sendBody("seda:foo", "17"); + template.sendBody("seda:foo", "219"); + template.sendBody("seda:foo", "89"); + template.sendBody("seda:foo", "112"); + template.sendBody("seda:foo", "39"); + + assertMockEndpointsSatisfied(); + } + + public void testConsumePredicateDrop() throws Exception { + CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context); + + MyConsumeBean my = new MyConsumeBean(); + + Method method = my.getClass().getMethod("low", String.class); + helper.consumerInjection(method, my, "foo"); + method = my.getClass().getMethod("high", String.class); + helper.consumerInjection(method, my, "foo"); + + getMockEndpoint("mock:low").expectedBodiesReceived("17"); + getMockEndpoint("mock:high").expectedBodiesReceived("112"); + + template.sendBody("seda:foo", "17"); + // should be dropped as it does not match any predicates + template.sendBody("seda:foo", "-1"); + template.sendBody("seda:foo", "112"); + + assertMockEndpointsSatisfied(); + } + + public class MyConsumeBean { + + @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100") + public void low(String body) { + template.sendBody("mock:low", body); + } + + @Consume(uri = "seda:foo", predicate = "${body} >= 100") + public void high(String body) { + template.sendBody("mock:high", body); + } + } + +}