Author: davsclaus
Date: Tue Oct 30 16:09:35 2012
New Revision: 1403760

URL: http://svn.apache.org/viewvc?rev=1403760&view=rev
Log:
CAMEL-5579: Added AbstractListAggregationStrategy to make it easier to 
aggregate a List<V> with the aggregator eip. Thanks to Claudio Corsi for 
partial patch.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java?rev=1403760&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
 Tue Oct 30 16:09:35 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * Aggregate all exchanges into a {@link List} of values defined by the {@link 
#getValue(Exchange)} call.
+ * The combined Exchange will hold all the aggregated exchanges in a {@link 
java.util.List}
+ * as a exchange property with the key {@link 
org.apache.camel.Exchange#GROUPED_EXCHANGE}.
+ * <p/>
+ * The method {@link #isStoreAsBodyOnCompletion()} determines if the 
aggregated {@link List} should
+ * be stored on the {@link org.apache.camel.Message#setBody(Object)} or be 
kept as a property
+ * on the exchange.
+ * <br/>
+ * The default behavior to store as message body, allows to more easily group 
together a list of values
+ * and have its result stored as a {@link List} on the completed {@link 
Exchange}.
+ *
+ * @since 2.11
+ */
+public abstract class AbstractListAggregationStrategy<V> implements 
CompletionAwareAggregationStrategy {
+
+    /**
+     * This method is implemented by the sub-class and is called to retrieve
+     * an instance of the value that will be aggregated and forwarded to the
+     * receiving end point.
+     * <p/>
+     * If <tt>null</tt> is returned, then the value is <b>not</b> added to the 
{@link List}.
+     *
+     * @param exchange  The exchange that is used to retrieve the value from
+     * @return An instance of V that is the associated value of the passed 
exchange
+     */
+    public abstract V getValue(Exchange exchange);
+
+    /**
+     * Whether to store the completed aggregated {@link List} as message body, 
or to keep as property on the exchange.
+     * <p/>
+     * The default behavior is <tt>true</tt> to store as message body.
+     *
+     * @return <tt>true</tt> to store as message body, <tt>false</tt> to keep 
as property on the exchange.
+     */
+    public boolean isStoreAsBodyOnCompletion() {
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void onCompletion(Exchange exchange) {
+        if (isStoreAsBodyOnCompletion()) {
+            List<V> list = (List<V>) 
exchange.removeProperty(Exchange.GROUPED_EXCHANGE);
+            if (list != null) {
+                exchange.getIn().setBody(list);
+            }
+        }
+    }
+
+    /**
+     * This method will aggregate the old and new exchange and return the 
result.
+     *
+     * @param oldExchange The oldest exchange, can be null
+     * @param newExchange The newest exchange, can be null
+     * @return a composite exchange of the old and/or new exchanges
+     */
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        List<V> list;
+        Exchange answer = oldExchange;
+
+        if (oldExchange == null) {
+            answer = new DefaultExchange(newExchange);
+            list = getList(answer);
+        } else {
+            list = getList(oldExchange);
+        }
+
+        if (newExchange != null) {
+            V value = getValue(newExchange);
+            if (value != null) {
+                list.add(value);
+            }
+        }
+
+        return answer;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<V> getList(Exchange exchange) {
+        List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
+        if (list == null) {
+            list = new ArrayList<V>();
+            exchange.setProperty(Exchange.GROUPED_EXCHANGE, list);
+        }
+        return list;
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java?rev=1403760&r1=1403759&r2=1403760&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
 Tue Oct 30 16:09:35 2012
@@ -16,11 +16,7 @@
  */
 package org.apache.camel.processor.aggregate;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
 
 /**
  * Aggregate all exchanges into a single combined Exchange holding all the 
aggregated exchanges
@@ -29,25 +25,17 @@ import org.apache.camel.impl.DefaultExch
  *
  * @version 
  */
-public class GroupedExchangeAggregationStrategy implements AggregationStrategy 
{
+public class GroupedExchangeAggregationStrategy extends 
AbstractListAggregationStrategy<Exchange> {
+
+    @Override
+    public boolean isStoreAsBodyOnCompletion() {
+        // keep the list as a property to be compatible with old behavior
+        return false;
+    }
 
-    @SuppressWarnings("unchecked")
-    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        List<Exchange> list;
-        Exchange answer = oldExchange;
-
-        if (oldExchange == null) {
-            answer = new DefaultExchange(newExchange);
-            list = new ArrayList<Exchange>();
-            answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
-        } else {
-            list = oldExchange.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
-        }
-
-        if (newExchange != null) {
-            list.add(newExchange);
-        }
-        return answer;
+    @Override
+    public Exchange getValue(Exchange exchange) {
+        return exchange;
     }
 
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java?rev=1403760&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
 Tue Oct 30 16:09:35 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
+
+/**
+ *
+ */
+public class CustomListAggregationStrategyTest extends ContextTestSupport {
+
+    @SuppressWarnings("unchecked")
+    public void testCustomAggregationStrategy() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "100", "id", "1");
+        template.sendBodyAndHeader("direct:start", "150", "id", "1");
+        template.sendBodyAndHeader("direct:start", "130", "id", "1");
+
+        assertMockEndpointsSatisfied();
+
+        // the list will be stored as the message body by default
+        List<Integer> numbers = 
result.getExchanges().get(0).getIn().getBody(List.class);
+        assertNotNull(numbers);
+        assertEquals(Integer.valueOf("100"), numbers.get(0));
+        assertEquals(Integer.valueOf("150"), numbers.get(1));
+        assertEquals(Integer.valueOf("130"), numbers.get(2));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(new MyListOfNumbersStrategy()).header("id")
+                    .completionSize(3)
+                    .to("mock:result");
+            }
+        };
+    }
+
+    // START SNIPPET: e1
+    /**
+     * Our strategy just group a list of integers.
+     */
+    public final class MyListOfNumbersStrategy extends 
AbstractListAggregationStrategy<Integer> {
+
+        @Override
+        public Integer getValue(Exchange exchange) {
+            // the message body contains a number, so just return that as-is
+            return exchange.getIn().getBody(Integer.class);
+        }
+    }
+    // END SNIPPET: e1
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to