CAMEL-11563: Add predicate option to @Consume so the bean is only called if its 
evaluated to true


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9041c4de
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9041c4de
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9041c4de

Branch: refs/heads/master
Commit: 9041c4de25c599e5f1512b0744adbb9e7ef433a5
Parents: 49c3c02
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Jul 23 16:02:58 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jul 23 16:04:12 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/CamelPostProcessorHelper.java    | 16 +++-
 .../camel/impl/SubscribeMethodProcessor.java    |  6 +-
 ...PostProcessorHelperConsumePredicateTest.java | 20 ++---
 ...ProcessorHelperSedaConsumePredicateTest.java | 85 ++++++++++++++++++++
 4 files changed, 111 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 7296b0b..aa7e15e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -29,6 +29,7 @@ import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.FluentProducerTemplate;
 import org.apache.camel.IsSingleton;
+import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.NoSuchBeanException;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
@@ -102,9 +103,14 @@ public class CamelPostProcessorHelper implements 
CamelContextAware {
         String injectionPointName = method.getName();
         Endpoint endpoint = getEndpointInjection(bean, endpointUri, 
endpointName, endpointProperty, injectionPointName, true);
         if (endpoint != null) {
+            boolean multipleConsumer = false;
+            if (endpoint instanceof MultipleConsumersSupport) {
+                multipleConsumer = ((MultipleConsumersSupport) 
endpoint).isMultipleConsumersSupported();
+            }
             try {
                 SubscribeMethodProcessor processor = 
getConsumerProcessor(endpoint);
-                if (processor == null) {
+                // if multiple consumer then create a new consumer per 
subscribed method
+                if (multipleConsumer || processor == null) {
                     // create new processor and new consumer which happens the 
first time
                     processor = new SubscribeMethodProcessor(endpoint);
                     // make sure processor is registered in registry so we can 
reuse it (eg we can look it up)
@@ -113,10 +119,14 @@ public class CamelPostProcessorHelper implements 
CamelContextAware {
                     Consumer consumer = endpoint.createConsumer(processor);
                     startService(consumer, endpoint.getCamelContext(), bean, 
beanName);
                 } else {
-                    // add to existing processor
+                    // add method to existing processor
                     processor.addMethod(bean, method, endpoint, predicate);
                 }
-                LOG.debug("Subscribed method: {} to consume from endpoint: {} 
with predicate: {}", method, endpoint, predicate);
+                if (predicate != null) {
+                    LOG.debug("Subscribed method: {} to consume from endpoint: 
{} with predicate: {}", method, endpoint, predicate);
+                } else {
+                    LOG.debug("Subscribed method: {} to consume from endpoint: 
{}", method, endpoint);
+                }
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java 
b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
index 97d953f..bf39a7e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
index 3d9f99d..7ae0ffb 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
@@ -39,11 +39,11 @@ public class CamelPostProcessorHelperConsumePredicateTest 
extends ContextTestSup
         getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39");
         getMockEndpoint("mock:high").expectedBodiesReceived("219", "112");
 
-        template.sendBody("seda:foo", "17");
-        template.sendBody("seda:foo", "219");
-        template.sendBody("seda:foo", "89");
-        template.sendBody("seda:foo", "112");
-        template.sendBody("seda:foo", "39");
+        template.sendBody("direct:foo", "17");
+        template.sendBody("direct:foo", "219");
+        template.sendBody("direct:foo", "89");
+        template.sendBody("direct:foo", "112");
+        template.sendBody("direct:foo", "39");
 
         assertMockEndpointsSatisfied();
     }
@@ -61,22 +61,22 @@ public class CamelPostProcessorHelperConsumePredicateTest 
extends ContextTestSup
         getMockEndpoint("mock:low").expectedBodiesReceived("17");
         getMockEndpoint("mock:high").expectedBodiesReceived("112");
 
-        template.sendBody("seda:foo", "17");
+        template.sendBody("direct:foo", "17");
         // should be dropped as it does not match any predicates
-        template.sendBody("seda:foo", "-1");
-        template.sendBody("seda:foo", "112");
+        template.sendBody("direct:foo", "-1");
+        template.sendBody("direct:foo", "112");
 
         assertMockEndpointsSatisfied();
     }
 
     public class MyConsumeBean {
 
-        @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100")
+        @Consume(uri = "direct:foo", predicate = "${body} >= 0 && ${body} < 
100")
         public void low(String body) {
             template.sendBody("mock:low", body);
         }
 
-        @Consume(uri = "seda:foo", predicate = "${body} >= 100")
+        @Consume(uri = "direct:foo", predicate = "${body} >= 100")
         public void high(String body) {
             template.sendBody("mock:high", body);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/9041c4de/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
new file mode 100644
index 0000000..ef6e9e1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperSedaConsumePredicateTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.lang.reflect.Method;
+
+import org.apache.camel.Consume;
+import org.apache.camel.ContextTestSupport;
+
+/**
+ * @version 
+ */
+public class CamelPostProcessorHelperSedaConsumePredicateTest extends 
ContextTestSupport {
+
+    public void testConsumePredicate() throws Exception {
+        CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
+
+        MyConsumeBean my = new MyConsumeBean();
+
+        Method method = my.getClass().getMethod("low", String.class);
+        helper.consumerInjection(method, my, "foo");
+        method = my.getClass().getMethod("high", String.class);
+        helper.consumerInjection(method, my, "foo");
+
+        getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39");
+        getMockEndpoint("mock:high").expectedBodiesReceived("219", "112");
+
+        template.sendBody("seda:foo", "17");
+        template.sendBody("seda:foo", "219");
+        template.sendBody("seda:foo", "89");
+        template.sendBody("seda:foo", "112");
+        template.sendBody("seda:foo", "39");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testConsumePredicateDrop() throws Exception {
+        CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
+
+        MyConsumeBean my = new MyConsumeBean();
+
+        Method method = my.getClass().getMethod("low", String.class);
+        helper.consumerInjection(method, my, "foo");
+        method = my.getClass().getMethod("high", String.class);
+        helper.consumerInjection(method, my, "foo");
+
+        getMockEndpoint("mock:low").expectedBodiesReceived("17");
+        getMockEndpoint("mock:high").expectedBodiesReceived("112");
+
+        template.sendBody("seda:foo", "17");
+        // should be dropped as it does not match any predicates
+        template.sendBody("seda:foo", "-1");
+        template.sendBody("seda:foo", "112");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyConsumeBean {
+
+        @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100")
+        public void low(String body) {
+            template.sendBody("mock:low", body);
+        }
+
+        @Consume(uri = "seda:foo", predicate = "${body} >= 100")
+        public void high(String body) {
+            template.sendBody("mock:high", body);
+        }
+    }
+
+}

Reply via email to