Author: davsclaus Date: Tue Oct 30 16:09:35 2012 New Revision: 1403760 URL: http://svn.apache.org/viewvc?rev=1403760&view=rev Log: CAMEL-5579: Added AbstractListAggregationStrategy to make it easier to aggregate a List<V> with the aggregator eip. Thanks to Claudio Corsi for partial patch.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java?rev=1403760&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java Tue Oct 30 16:09:35 2012 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; + +/** + * Aggregate all exchanges into a {@link List} of values defined by the {@link #getValue(Exchange)} call. + * The combined Exchange will hold all the aggregated exchanges in a {@link java.util.List} + * as a exchange property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}. + * <p/> + * The method {@link #isStoreAsBodyOnCompletion()} determines if the aggregated {@link List} should + * be stored on the {@link org.apache.camel.Message#setBody(Object)} or be kept as a property + * on the exchange. + * <br/> + * The default behavior to store as message body, allows to more easily group together a list of values + * and have its result stored as a {@link List} on the completed {@link Exchange}. + * + * @since 2.11 + */ +public abstract class AbstractListAggregationStrategy<V> implements CompletionAwareAggregationStrategy { + + /** + * This method is implemented by the sub-class and is called to retrieve + * an instance of the value that will be aggregated and forwarded to the + * receiving end point. + * <p/> + * If <tt>null</tt> is returned, then the value is <b>not</b> added to the {@link List}. + * + * @param exchange The exchange that is used to retrieve the value from + * @return An instance of V that is the associated value of the passed exchange + */ + public abstract V getValue(Exchange exchange); + + /** + * Whether to store the completed aggregated {@link List} as message body, or to keep as property on the exchange. + * <p/> + * The default behavior is <tt>true</tt> to store as message body. + * + * @return <tt>true</tt> to store as message body, <tt>false</tt> to keep as property on the exchange. + */ + public boolean isStoreAsBodyOnCompletion() { + return true; + } + + @SuppressWarnings("unchecked") + public void onCompletion(Exchange exchange) { + if (isStoreAsBodyOnCompletion()) { + List<V> list = (List<V>) exchange.removeProperty(Exchange.GROUPED_EXCHANGE); + if (list != null) { + exchange.getIn().setBody(list); + } + } + } + + /** + * This method will aggregate the old and new exchange and return the result. + * + * @param oldExchange The oldest exchange, can be null + * @param newExchange The newest exchange, can be null + * @return a composite exchange of the old and/or new exchanges + */ + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + List<V> list; + Exchange answer = oldExchange; + + if (oldExchange == null) { + answer = new DefaultExchange(newExchange); + list = getList(answer); + } else { + list = getList(oldExchange); + } + + if (newExchange != null) { + V value = getValue(newExchange); + if (value != null) { + list.add(value); + } + } + + return answer; + } + + @SuppressWarnings("unchecked") + private List<V> getList(Exchange exchange) { + List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); + if (list == null) { + list = new ArrayList<V>(); + exchange.setProperty(Exchange.GROUPED_EXCHANGE, list); + } + return list; + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java?rev=1403760&r1=1403759&r2=1403760&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java Tue Oct 30 16:09:35 2012 @@ -16,11 +16,7 @@ */ package org.apache.camel.processor.aggregate; -import java.util.ArrayList; -import java.util.List; - import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultExchange; /** * Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges @@ -29,25 +25,17 @@ import org.apache.camel.impl.DefaultExch * * @version */ -public class GroupedExchangeAggregationStrategy implements AggregationStrategy { +public class GroupedExchangeAggregationStrategy extends AbstractListAggregationStrategy<Exchange> { + + @Override + public boolean isStoreAsBodyOnCompletion() { + // keep the list as a property to be compatible with old behavior + return false; + } - @SuppressWarnings("unchecked") - public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - List<Exchange> list; - Exchange answer = oldExchange; - - if (oldExchange == null) { - answer = new DefaultExchange(newExchange); - list = new ArrayList<Exchange>(); - answer.setProperty(Exchange.GROUPED_EXCHANGE, list); - } else { - list = oldExchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - } - - if (newExchange != null) { - list.add(newExchange); - } - return answer; + @Override + public Exchange getValue(Exchange exchange) { + return exchange; } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java?rev=1403760&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java Tue Oct 30 16:09:35 2012 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy; + +/** + * + */ +public class CustomListAggregationStrategyTest extends ContextTestSupport { + + @SuppressWarnings("unchecked") + public void testCustomAggregationStrategy() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "100", "id", "1"); + template.sendBodyAndHeader("direct:start", "150", "id", "1"); + template.sendBodyAndHeader("direct:start", "130", "id", "1"); + + assertMockEndpointsSatisfied(); + + // the list will be stored as the message body by default + List<Integer> numbers = result.getExchanges().get(0).getIn().getBody(List.class); + assertNotNull(numbers); + assertEquals(Integer.valueOf("100"), numbers.get(0)); + assertEquals(Integer.valueOf("150"), numbers.get(1)); + assertEquals(Integer.valueOf("130"), numbers.get(2)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(new MyListOfNumbersStrategy()).header("id") + .completionSize(3) + .to("mock:result"); + } + }; + } + + // START SNIPPET: e1 + /** + * Our strategy just group a list of integers. + */ + public final class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> { + + @Override + public Integer getValue(Exchange exchange) { + // the message body contains a number, so just return that as-is + return exchange.getIn().getBody(Integer.class); + } + } + // END SNIPPET: e1 + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date