Repository: camel
Updated Branches:
  refs/heads/master 00fda9de9 -> 261ce8d29


CAMEL-11563: Add predicate option to @Consume so the bean is only called if its 
evaluated to true


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f69b57a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f69b57a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f69b57a

Branch: refs/heads/master
Commit: 3f69b57a5efbdd631d0cf140fb21cc5fea72d9f6
Parents: 00fda9d
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Jul 23 14:27:17 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jul 23 14:46:01 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     |   8 ++
 .../src/main/java/org/apache/camel/Consume.java |   9 ++
 .../camel/impl/CamelPostProcessorHelper.java    |  39 +++---
 .../apache/camel/impl/DefaultCamelContext.java  |  11 ++
 .../camel/impl/SubscribeMethodProcessor.java    | 126 +++++++++++++++++++
 ...PostProcessorHelperConsumePredicateTest.java |  85 +++++++++++++
 6 files changed, 258 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java 
b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 98d0d6c..3ccb716 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -306,6 +306,14 @@ public interface CamelContext extends SuspendableService, 
RuntimeConfiguration {
     <T> T hasService(Class<T> type);
 
     /**
+     * Has the given service type already been added to this CamelContext?
+     *
+     * @param type the class type
+     * @return the services instance or empty set.
+     */
+    <T> Set<T> hasServices(Class<T> type);
+
+    /**
      * Defers starting the service until {@link CamelContext} is (almost 
started) or started and has initialized all its prior services and routes.
      * <p/>
      * If {@link CamelContext} is already started then the service is started 
immediately.

http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/Consume.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Consume.java 
b/camel-core/src/main/java/org/apache/camel/Consume.java
index c0e6ff8..175e293 100644
--- a/camel-core/src/main/java/org/apache/camel/Consume.java
+++ b/camel-core/src/main/java/org/apache/camel/Consume.java
@@ -57,4 +57,13 @@ public @interface Consume {
      * Id of {@link CamelContext} to use
      */
     String context() default "";
+
+    /**
+     * Optional predicate (using simple language) to only consume if the 
predicate matches .
+     * This can be used to filter messages.
+     * <p/>
+     * Notice that only the first method that matches the predicate will be 
used.
+     * And if no predicate matches then the message is dropped.
+     */
+    String predicate() default "";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index a9063cf..7296b0b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -37,10 +37,7 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.ProxyInstantiationException;
 import org.apache.camel.Service;
 import org.apache.camel.builder.DefaultFluentProducerTemplate;
-import org.apache.camel.component.bean.BeanInfo;
-import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.bean.ProxyHelper;
-import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.DeferServiceFactory;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.util.CamelContextHelper;
@@ -96,20 +93,30 @@ public class CamelPostProcessorHelper implements 
CamelContextAware {
         Consume consume = method.getAnnotation(Consume.class);
         if (consume != null && matchContext(consume.context())) {
             LOG.debug("Creating a consumer for: " + consume);
-            subscribeMethod(method, bean, beanName, consume.uri(), 
consume.ref(), consume.property());
+            subscribeMethod(method, bean, beanName, consume.uri(), 
consume.ref(), consume.property(), consume.predicate());
         }
     }
 
-    public void subscribeMethod(Method method, Object bean, String beanName, 
String endpointUri, String endpointName, String endpointProperty) {
+    public void subscribeMethod(Method method, Object bean, String beanName, 
String endpointUri, String endpointName, String endpointProperty, String 
predicate) {
         // lets bind this method to a listener
         String injectionPointName = method.getName();
         Endpoint endpoint = getEndpointInjection(bean, endpointUri, 
endpointName, endpointProperty, injectionPointName, true);
         if (endpoint != null) {
             try {
-                Processor processor = createConsumerProcessor(bean, method, 
endpoint);
-                Consumer consumer = endpoint.createConsumer(processor);
-                LOG.debug("Created processor: {} for consumer: {}", processor, 
consumer);
-                startService(consumer, endpoint.getCamelContext(), bean, 
beanName);
+                SubscribeMethodProcessor processor = 
getConsumerProcessor(endpoint);
+                if (processor == null) {
+                    // create new processor and new consumer which happens the 
first time
+                    processor = new SubscribeMethodProcessor(endpoint);
+                    // make sure processor is registered in registry so we can 
reuse it (eg we can look it up)
+                    endpoint.getCamelContext().addService(processor, true);
+                    processor.addMethod(bean, method, endpoint, predicate);
+                    Consumer consumer = endpoint.createConsumer(processor);
+                    startService(consumer, endpoint.getCamelContext(), bean, 
beanName);
+                } else {
+                    // add to existing processor
+                    processor.addMethod(bean, method, endpoint, predicate);
+                }
+                LOG.debug("Subscribed method: {} to consume from endpoint: {} 
with predicate: {}", method, endpoint, predicate);
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
@@ -134,17 +141,9 @@ public class CamelPostProcessorHelper implements 
CamelContextAware {
         }
     }
 
-    /**
-     * Create a processor which invokes the given method when an incoming
-     * message exchange is received
-     */
-    protected Processor createConsumerProcessor(final Object pojo, final 
Method method, final Endpoint endpoint) {
-        BeanInfo info = new BeanInfo(getCamelContext(), method);
-        BeanProcessor answer = new BeanProcessor(pojo, info);
-        // must ensure the consumer is being executed in an unit of work so 
synchronization callbacks etc is invoked
-        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
-        internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
-        return internal;
+    protected SubscribeMethodProcessor getConsumerProcessor(Endpoint endpoint) 
{
+        Set<SubscribeMethodProcessor> processors = 
endpoint.getCamelContext().hasServices(SubscribeMethodProcessor.class);
+        return processors.stream().filter(s -> s.getEndpoint() == 
endpoint).findFirst().orElse(null);
     }
 
     public Endpoint getEndpointInjection(Object bean, String uri, String name, 
String propertyName,

http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index bf7cb59..061263a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -1512,6 +1512,17 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
         return null;
     }
 
+    @Override
+    public <T> Set<T> hasServices(Class<T> type) {
+        Set<T> set = new HashSet<>();
+        for (Service service : servicesToStop) {
+            if (type.isInstance(service)) {
+                set.add((T) service);
+            }
+        }
+        return set;
+    }
+
     public void deferStartService(Object object, boolean stopOnShutdown) 
throws Exception {
         if (object instanceof Service) {
             Service service = (Service) object;

http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java 
b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
new file mode 100644
index 0000000..97d953f
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.component.bean.BeanInfo;
+import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link Processor} which is used for POJO @Consume where you can have 
multiple @Consume on the same endpoint/consumer
+ * and via predicate's can filter and call different methods.
+ */
+public final class SubscribeMethodProcessor extends ServiceSupport implements 
AsyncProcessor, Navigate<Processor> {
+
+    private final Endpoint endpoint;
+    private final Map<AsyncProcessor, Predicate> methods = new 
LinkedHashMap<>();
+
+    public SubscribeMethodProcessor(Endpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    protected void addMethod(final Object pojo, final Method method, final 
Endpoint endpoint, String predicate) {
+        BeanInfo info = new BeanInfo(endpoint.getCamelContext(), method);
+        BeanProcessor answer = new BeanProcessor(pojo, info);
+        // must ensure the consumer is being executed in an unit of work so 
synchronization callbacks etc is invoked
+        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
+        internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+
+        Predicate p;
+        if (ObjectHelper.isEmpty(predicate)) {
+            p = PredicateBuilder.constant(true);
+        } else {
+            p = 
endpoint.getCamelContext().resolveLanguage("simple").createPredicate(predicate);
+        }
+        methods.put(internal, p);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            // evaluate which predicate matches and call the method
+            for (Map.Entry<AsyncProcessor, Predicate> entry : 
methods.entrySet()) {
+                Predicate predicate = entry.getValue();
+                if (predicate.matches(exchange)) {
+                    return entry.getKey().process(exchange, callback);
+                }
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            return true;
+        }
+
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(methods.keySet());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(methods.keySet());
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(methods.keySet());
+    }
+
+    @Override
+    public String toString() {
+        return "SubscribeMethodProcessor[" + endpoint + "]";
+    }
+
+    @Override
+    public List<Processor> next() {
+        return new ArrayList<>(methods.keySet());
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !methods.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
new file mode 100644
index 0000000..3d9f99d
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.lang.reflect.Method;
+
+import org.apache.camel.Consume;
+import org.apache.camel.ContextTestSupport;
+
+/**
+ * @version 
+ */
+public class CamelPostProcessorHelperConsumePredicateTest extends 
ContextTestSupport {
+
+    public void testConsumePredicate() throws Exception {
+        CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
+
+        MyConsumeBean my = new MyConsumeBean();
+
+        Method method = my.getClass().getMethod("low", String.class);
+        helper.consumerInjection(method, my, "foo");
+        method = my.getClass().getMethod("high", String.class);
+        helper.consumerInjection(method, my, "foo");
+
+        getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39");
+        getMockEndpoint("mock:high").expectedBodiesReceived("219", "112");
+
+        template.sendBody("seda:foo", "17");
+        template.sendBody("seda:foo", "219");
+        template.sendBody("seda:foo", "89");
+        template.sendBody("seda:foo", "112");
+        template.sendBody("seda:foo", "39");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testConsumePredicateDrop() throws Exception {
+        CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
+
+        MyConsumeBean my = new MyConsumeBean();
+
+        Method method = my.getClass().getMethod("low", String.class);
+        helper.consumerInjection(method, my, "foo");
+        method = my.getClass().getMethod("high", String.class);
+        helper.consumerInjection(method, my, "foo");
+
+        getMockEndpoint("mock:low").expectedBodiesReceived("17");
+        getMockEndpoint("mock:high").expectedBodiesReceived("112");
+
+        template.sendBody("seda:foo", "17");
+        // should be dropped as it does not match any predicates
+        template.sendBody("seda:foo", "-1");
+        template.sendBody("seda:foo", "112");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyConsumeBean {
+
+        @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100")
+        public void low(String body) {
+            template.sendBody("mock:low", body);
+        }
+
+        @Consume(uri = "seda:foo", predicate = "${body} >= 100")
+        public void high(String body) {
+            template.sendBody("mock:high", body);
+        }
+    }
+
+}

Reply via email to