CAMEL-6650: AggregationStrategy - Allow to use a pojo with no Camel API dependencies.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbc1a4df Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbc1a4df Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbc1a4df Branch: refs/heads/master Commit: bbc1a4df022c699917b2589cd0c2965448e1493d Parents: 36f6ba1 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 20 13:14:29 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 20 13:46:38 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/model/AggregateDefinition.java | 27 ++++++- .../apache/camel/model/EnrichDefinition.java | 17 +++- .../apache/camel/model/MulticastDefinition.java | 27 ++++++- .../camel/model/PollEnrichDefinition.java | 17 +++- .../camel/model/RecipientListDefinition.java | 27 ++++++- .../org/apache/camel/model/SplitDefinition.java | 27 ++++++- ...egationStrategyBeanAdapterAllowNullTest.java | 81 ++++++++++++++++++++ ...egationStrategyBeanAdapterAllowNullTest.java | 32 ++++++++ ...regationStrategyBeanAdapterAllowNullTest.xml | 42 ++++++++++ 9 files changed, 291 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 709fe92..0af2206 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -91,6 +91,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition @XmlAttribute private String strategyMethodName; @XmlAttribute + private Boolean strategyMethodAllowNull; + @XmlAttribute private Integer completionSize; @XmlAttribute private Long completionInterval; @@ -284,7 +286,12 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy) aggStrategy; } else if (aggStrategy != null) { - strategy = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + if (getStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); + } + strategy = adapter; } else { throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); } @@ -346,6 +353,14 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition this.strategyMethodName = strategyMethodName; } + public Boolean getStrategyMethodAllowNull() { + return strategyMethodAllowNull; + } + + public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { + this.strategyMethodAllowNull = strategyMethodAllowNull; + } + public Integer getCompletionSize() { return completionSize; } @@ -723,6 +738,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * Sets allowing null when using a POJO as {@link AggregationStrategy}. + * + * @return the builder + */ + public AggregateDefinition aggregationStrategyMethodAllowNull() { + setStrategyMethodAllowNull(true); + return this; + } + + /** * Sets the custom aggregate repository to use. * <p/> * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 88f79d8..9cb88bf 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -48,6 +48,8 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> { private String aggregationStrategyRef; @XmlAttribute(name = "strategyMethodName") private String aggregationStrategyMethodName; + @XmlAttribute(name = "strategyMethodAllowNull") + private Boolean aggregationStrategyMethodAllowNull; @XmlTransient private AggregationStrategy aggregationStrategy; @@ -114,7 +116,12 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> { if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy) aggStrategy; } else if (aggStrategy != null) { - strategy = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + if (getAggregationStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull()); + } + strategy = adapter; } else { throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef); } @@ -159,6 +166,14 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> { this.aggregationStrategyMethodName = aggregationStrategyMethodName; } + public Boolean getAggregationStrategyMethodAllowNull() { + return aggregationStrategyMethodAllowNull; + } + + public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { + this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index 291bace..8730ce1 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -48,6 +48,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i private String strategyRef; @XmlAttribute private String strategyMethodName; + @XmlAttribute + private Boolean strategyMethodAllowNull; @XmlTransient private ExecutorService executorService; @XmlAttribute @@ -126,6 +128,16 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } /** + * Sets allowing null when using a POJO as {@link AggregationStrategy}. + * + * @return the builder + */ + public MulticastDefinition aggregationStrategyMethodAllowNull() { + setStrategyMethodAllowNull(true); + return this; + } + + /** * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work * * @return the builder @@ -255,7 +267,12 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i if (strategy == null && strategyRef != null) { Object aggStrategy = routeContext.lookup(strategyRef, Object.class); if (aggStrategy instanceof AggregationStrategy) { - strategy = (AggregationStrategy) aggStrategy; + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); + if (getStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); + } + strategy = adapter; } else if (aggStrategy != null) { strategy = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); } else { @@ -340,6 +357,14 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i this.strategyMethodName = strategyMethodName; } + public Boolean getStrategyMethodAllowNull() { + return strategyMethodAllowNull; + } + + public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { + this.strategyMethodAllowNull = strategyMethodAllowNull; + } + public String getExecutorServiceRef() { return executorServiceRef; } http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index ede7fea..1fc9042 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -50,6 +50,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio private String aggregationStrategyRef; @XmlAttribute(name = "strategyMethodName") private String aggregationStrategyMethodName; + @XmlAttribute(name = "strategyMethodAllowNull") + private Boolean aggregationStrategyMethodAllowNull; @XmlTransient private AggregationStrategy aggregationStrategy; @@ -120,7 +122,12 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy) aggStrategy; } else if (aggStrategy != null) { - strategy = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); + if (getAggregationStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull()); + } + strategy = adapter; } else { throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef); } @@ -173,6 +180,14 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio this.aggregationStrategyMethodName = aggregationStrategyMethodName; } + public Boolean getAggregationStrategyMethodAllowNull() { + return aggregationStrategyMethodAllowNull; + } + + public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { + this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index e6f6ce8..fc12fb1 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -60,6 +60,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext @XmlAttribute private String strategyMethodName; @XmlAttribute + private Boolean strategyMethodAllowNull; + @XmlAttribute private String executorServiceRef; @XmlAttribute private Boolean stopOnException; @@ -173,7 +175,12 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy) aggStrategy; } else if (aggStrategy != null) { - strategy = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); + if (getStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); + } + strategy = adapter; } else { throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); } @@ -245,6 +252,16 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext } /** + * Sets allowing null when using a POJO as {@link AggregationStrategy}. + * + * @return the builder + */ + public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() { + setStrategyMethodAllowNull(true); + return this; + } + + /** * Ignore the invalidate endpoint exception when try to create a producer with that endpoint * * @return the builder @@ -389,6 +406,14 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext this.strategyMethodName = strategyMethodName; } + public Boolean getStrategyMethodAllowNull() { + return strategyMethodAllowNull; + } + + public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { + this.strategyMethodAllowNull = strategyMethodAllowNull; + } + public String getExecutorServiceRef() { return executorServiceRef; } http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index d723e20..0b54bdd 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -54,6 +54,8 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw @XmlAttribute private String strategyMethodName; @XmlAttribute + private Boolean strategyMethodAllowNull; + @XmlAttribute private String executorServiceRef; @XmlAttribute private Boolean streaming; @@ -131,7 +133,12 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw if (aggStrategy instanceof AggregationStrategy) { strategy = (AggregationStrategy) aggStrategy; } else if (aggStrategy != null) { - strategy = new AggregationStrategyBeanAdapter(aggStrategy, strategyMethodName); + AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); + if (getStrategyMethodAllowNull() != null) { + adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); + adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); + } + strategy = adapter; } else { throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); } @@ -180,6 +187,16 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw } /** + * Sets allowing null when using a POJO as {@link AggregationStrategy}. + * + * @return the builder + */ + public SplitDefinition aggregationStrategyMethodAllowNull() { + setStrategyMethodAllowNull(true); + return this; + } + + /** * Doing the splitting work in parallel * * @return the builder @@ -354,6 +371,14 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw this.strategyMethodName = strategyMethodName; } + public Boolean getStrategyMethodAllowNull() { + return strategyMethodAllowNull; + } + + public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { + this.strategyMethodAllowNull = strategyMethodAllowNull; + } + public String getExecutorServiceRef() { return executorServiceRef; } http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullTest.java new file mode 100644 index 0000000..c9cc79f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.toolbox.AggregationStrategies; + +public class AggregationStrategyBeanAdapterAllowNullTest extends ContextTestSupport { + + private MyUserAppender appender = new MyUserAppender(); + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", new User("Claus")); + template.sendBody("direct:start", new User("James")); + template.sendBody("direct:start", new User("Jonathan")); + + assertMockEndpointsSatisfied(); + + List names = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getIn().getBody(List.class); + assertEquals("Claus", names.get(0)); + assertEquals("James", names.get(1)); + assertEquals("Jonathan", names.get(2)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(constant(true), AggregationStrategies.beanAllowNull(appender, "addUsers")) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyUserAppender { + + public List addUsers(List names, User user) { + if (names == null) { + names = new ArrayList(); + } + names.add(user.getName()); + return names; + } + } + + public static final class User { + private String name; + + public User(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.java new file mode 100644 index 0000000..51b4a5b --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor.aggregator; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.aggregator.AggregationStrategyBeanAdapterAllowNullTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version + */ +public class SpringAggregationStrategyBeanAdapterAllowNullTest extends AggregationStrategyBeanAdapterAllowNullTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/bbc1a4df/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.xml new file mode 100644 index 0000000..4066d79 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregationStrategyBeanAdapterAllowNullTest.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <!-- START SNIPPET: e1 --> + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:start"/> + <aggregate strategyRef="myAppender" strategyMethodName="addUsers" strategyMethodAllowNull="true" + completionSize="3"> + <correlationExpression> + <constant>true</constant> + </correlationExpression> + <to uri="mock:result"/> + </aggregate> + </route> + </camelContext> + + <bean id="myAppender" class="org.apache.camel.processor.aggregator.AggregationStrategyBeanAdapterAllowNullTest.MyUserAppender"/> + <!-- END SNIPPET: e1 --> + +</beans>