Repository: camel Updated Branches: refs/heads/master 00fda9de9 -> 261ce8d29
CAMEL-11563: Add predicate option to @Consume so the bean is only called if its evaluated to true Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f69b57a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f69b57a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f69b57a Branch: refs/heads/master Commit: 3f69b57a5efbdd631d0cf140fb21cc5fea72d9f6 Parents: 00fda9d Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 23 14:27:17 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 23 14:46:01 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 8 ++ .../src/main/java/org/apache/camel/Consume.java | 9 ++ .../camel/impl/CamelPostProcessorHelper.java | 39 +++--- .../apache/camel/impl/DefaultCamelContext.java | 11 ++ .../camel/impl/SubscribeMethodProcessor.java | 126 +++++++++++++++++++ ...PostProcessorHelperConsumePredicateTest.java | 85 +++++++++++++ 6 files changed, 258 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 98d0d6c..3ccb716 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -306,6 +306,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { <T> T hasService(Class<T> type); /** + * Has the given service type already been added to this CamelContext? + * + * @param type the class type + * @return the services instance or empty set. + */ + <T> Set<T> hasServices(Class<T> type); + + /** * Defers starting the service until {@link CamelContext} is (almost started) or started and has initialized all its prior services and routes. * <p/> * If {@link CamelContext} is already started then the service is started immediately. http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/Consume.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Consume.java b/camel-core/src/main/java/org/apache/camel/Consume.java index c0e6ff8..175e293 100644 --- a/camel-core/src/main/java/org/apache/camel/Consume.java +++ b/camel-core/src/main/java/org/apache/camel/Consume.java @@ -57,4 +57,13 @@ public @interface Consume { * Id of {@link CamelContext} to use */ String context() default ""; + + /** + * Optional predicate (using simple language) to only consume if the predicate matches . + * This can be used to filter messages. + * <p/> + * Notice that only the first method that matches the predicate will be used. + * And if no predicate matches then the message is dropped. + */ + String predicate() default ""; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java index a9063cf..7296b0b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java @@ -37,10 +37,7 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.ProxyInstantiationException; import org.apache.camel.Service; import org.apache.camel.builder.DefaultFluentProducerTemplate; -import org.apache.camel.component.bean.BeanInfo; -import org.apache.camel.component.bean.BeanProcessor; import org.apache.camel.component.bean.ProxyHelper; -import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.DeferServiceFactory; import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.util.CamelContextHelper; @@ -96,20 +93,30 @@ public class CamelPostProcessorHelper implements CamelContextAware { Consume consume = method.getAnnotation(Consume.class); if (consume != null && matchContext(consume.context())) { LOG.debug("Creating a consumer for: " + consume); - subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property()); + subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property(), consume.predicate()); } } - public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty) { + public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty, String predicate) { // lets bind this method to a listener String injectionPointName = method.getName(); Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true); if (endpoint != null) { try { - Processor processor = createConsumerProcessor(bean, method, endpoint); - Consumer consumer = endpoint.createConsumer(processor); - LOG.debug("Created processor: {} for consumer: {}", processor, consumer); - startService(consumer, endpoint.getCamelContext(), bean, beanName); + SubscribeMethodProcessor processor = getConsumerProcessor(endpoint); + if (processor == null) { + // create new processor and new consumer which happens the first time + processor = new SubscribeMethodProcessor(endpoint); + // make sure processor is registered in registry so we can reuse it (eg we can look it up) + endpoint.getCamelContext().addService(processor, true); + processor.addMethod(bean, method, endpoint, predicate); + Consumer consumer = endpoint.createConsumer(processor); + startService(consumer, endpoint.getCamelContext(), bean, beanName); + } else { + // add to existing processor + processor.addMethod(bean, method, endpoint, predicate); + } + LOG.debug("Subscribed method: {} to consume from endpoint: {} with predicate: {}", method, endpoint, predicate); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -134,17 +141,9 @@ public class CamelPostProcessorHelper implements CamelContextAware { } } - /** - * Create a processor which invokes the given method when an incoming - * message exchange is received - */ - protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) { - BeanInfo info = new BeanInfo(getCamelContext(), method); - BeanProcessor answer = new BeanProcessor(pojo, info); - // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked - CamelInternalProcessor internal = new CamelInternalProcessor(answer); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); - return internal; + protected SubscribeMethodProcessor getConsumerProcessor(Endpoint endpoint) { + Set<SubscribeMethodProcessor> processors = endpoint.getCamelContext().hasServices(SubscribeMethodProcessor.class); + return processors.stream().filter(s -> s.getEndpoint() == endpoint).findFirst().orElse(null); } public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName, http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index bf7cb59..061263a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -1512,6 +1512,17 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon return null; } + @Override + public <T> Set<T> hasServices(Class<T> type) { + Set<T> set = new HashSet<>(); + for (Service service : servicesToStop) { + if (type.isInstance(service)) { + set.add((T) service); + } + } + return set; + } + public void deferStartService(Object object, boolean stopOnShutdown) throws Exception { if (object instanceof Service) { Service service = (Service) object; http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java new file mode 100644 index 0000000..97d953f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/SubscribeMethodProcessor.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Navigate; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.builder.PredicateBuilder; +import org.apache.camel.component.bean.BeanInfo; +import org.apache.camel.component.bean.BeanProcessor; +import org.apache.camel.processor.CamelInternalProcessor; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; + +/** + * A {@link Processor} which is used for POJO @Consume where you can have multiple @Consume on the same endpoint/consumer + * and via predicate's can filter and call different methods. + */ +public final class SubscribeMethodProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor> { + + private final Endpoint endpoint; + private final Map<AsyncProcessor, Predicate> methods = new LinkedHashMap<>(); + + public SubscribeMethodProcessor(Endpoint endpoint) { + this.endpoint = endpoint; + } + + public Endpoint getEndpoint() { + return endpoint; + } + + protected void addMethod(final Object pojo, final Method method, final Endpoint endpoint, String predicate) { + BeanInfo info = new BeanInfo(endpoint.getCamelContext(), method); + BeanProcessor answer = new BeanProcessor(pojo, info); + // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked + CamelInternalProcessor internal = new CamelInternalProcessor(answer); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + + Predicate p; + if (ObjectHelper.isEmpty(predicate)) { + p = PredicateBuilder.constant(true); + } else { + p = endpoint.getCamelContext().resolveLanguage("simple").createPredicate(predicate); + } + methods.put(internal, p); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + // evaluate which predicate matches and call the method + for (Map.Entry<AsyncProcessor, Predicate> entry : methods.entrySet()) { + Predicate predicate = entry.getValue(); + if (predicate.matches(exchange)) { + return entry.getKey().process(exchange, callback); + } + } + } catch (Throwable e) { + exchange.setException(e); + return true; + } + + return true; + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startServices(methods.keySet()); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(methods.keySet()); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(methods.keySet()); + } + + @Override + public String toString() { + return "SubscribeMethodProcessor[" + endpoint + "]"; + } + + @Override + public List<Processor> next() { + return new ArrayList<>(methods.keySet()); + } + + @Override + public boolean hasNext() { + return !methods.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f69b57a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java new file mode 100644 index 0000000..3d9f99d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperConsumePredicateTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.lang.reflect.Method; + +import org.apache.camel.Consume; +import org.apache.camel.ContextTestSupport; + +/** + * @version + */ +public class CamelPostProcessorHelperConsumePredicateTest extends ContextTestSupport { + + public void testConsumePredicate() throws Exception { + CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context); + + MyConsumeBean my = new MyConsumeBean(); + + Method method = my.getClass().getMethod("low", String.class); + helper.consumerInjection(method, my, "foo"); + method = my.getClass().getMethod("high", String.class); + helper.consumerInjection(method, my, "foo"); + + getMockEndpoint("mock:low").expectedBodiesReceived("17", "89", "39"); + getMockEndpoint("mock:high").expectedBodiesReceived("219", "112"); + + template.sendBody("seda:foo", "17"); + template.sendBody("seda:foo", "219"); + template.sendBody("seda:foo", "89"); + template.sendBody("seda:foo", "112"); + template.sendBody("seda:foo", "39"); + + assertMockEndpointsSatisfied(); + } + + public void testConsumePredicateDrop() throws Exception { + CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context); + + MyConsumeBean my = new MyConsumeBean(); + + Method method = my.getClass().getMethod("low", String.class); + helper.consumerInjection(method, my, "foo"); + method = my.getClass().getMethod("high", String.class); + helper.consumerInjection(method, my, "foo"); + + getMockEndpoint("mock:low").expectedBodiesReceived("17"); + getMockEndpoint("mock:high").expectedBodiesReceived("112"); + + template.sendBody("seda:foo", "17"); + // should be dropped as it does not match any predicates + template.sendBody("seda:foo", "-1"); + template.sendBody("seda:foo", "112"); + + assertMockEndpointsSatisfied(); + } + + public class MyConsumeBean { + + @Consume(uri = "seda:foo", predicate = "${body} >= 0 && ${body} < 100") + public void low(String body) { + template.sendBody("mock:low", body); + } + + @Consume(uri = "seda:foo", predicate = "${body} >= 100") + public void high(String body) { + template.sendBody("mock:high", body); + } + } + +}