This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e20c53  CAMEL-10474: Aggregate EIP allow to force complete of all 
previous groups decided by AggregationStrategy.
2e20c53 is described below

commit 2e20c532241533d732f7a23a1cc57639ff97d8dd
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 27 13:00:19 2018 +0100

    CAMEL-10474: Aggregate EIP allow to force complete of all previous groups 
decided by AggregationStrategy.
---
 camel-core/src/main/docs/eips/aggregate-eip.adoc   | 84 +++++++++++++++-------
 .../processor/aggregate/AggregateProcessor.java    |  9 ++-
 ...eCompletionHeaderInAggregationStrategyTest.java | 72 +++++++++++++++++++
 3 files changed, 140 insertions(+), 25 deletions(-)

diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc 
b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index c2795af..80f239b 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -188,8 +188,7 @@ consumer etc)
 The aggregator provides a pluggable repository which you can implement
 your own `org.apache.camel.spi.AggregationRepository`. +
  If you need persistent repository then you can use either Camel
-link:hawtdb.html[HawtDB], link:leveldb.html[LevelDB], or
-<<sql-component,SQL Component>> components.
+link:leveldb.html[LevelDB], or <<sql-component,SQL Component>> components.
 
 === Using TimeoutAwareAggregationStrategy
 
@@ -224,7 +223,7 @@ completion predicates / sizes / timeouts etc, and complete 
the group.
 
 For example the following logic (from an unit test) will complete the
 group if the message body size is larger than 5. This is done by setting
-the property Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP to true.
+the property `Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP` to `true`.
 
 [source,java]
 ----
@@ -245,34 +244,71 @@ the property Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP 
to true.
     }
 ----
 
+
+=== Completing all previous group decided from the AggregationStrategy
+
+*Available as of Camel 2.21*
+
+The `AggregationStrategy` can now included a property on the
+returned `Exchange` that contains a boolean to indicate if all previous
+groups should be completed. This allows to overrule any existing
+completion predicates / sizes / timeouts etc, and complete all the existing
+previous group.
+
+For example the following logic (from an unit test) will complete all the
+previous group when a new aggregation group is started. This is done by
+setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
+
+[source,java]
+----
+    public final class MyCompletionStrategy implements AggregationStrategy {
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                // we start a new correlation group, so complete all previous 
groups
+                
newExchange.setProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+                return newExchange;
+            }
+
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+----
+
  
 
 === Manually Force the Completion of All Aggregated Exchanges Immediately
 
-*Available as of Camel 2.9* +
- You can manually trigger completion of all current aggregated exchanges
+*Available as of Camel 2.9*
+
+You can manually trigger completion of all current aggregated exchanges
 by sending a message containing the header
-Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is
+`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
 considered a signal message only, the message headers/contents will not
 be processed otherwise.
 
-*Available as of Camel 2.11* +
- You can alternatively set the header
-Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE to true to trigger
+*Available as of Camel 2.11*
+
+You can alternatively set the header
+`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE` to `true` to trigger
 completion of all groups after processing the current message.
 
 === Using a List<V> in AggregationStrategy
 
 *Available as of Camel 2.11*
 
-If you want to aggregate some value from the messages <V> into a List<V>
+If you want to aggregate some value from the messages `<V>` into a `List<V>`
 then we have added a
 `org.apache.camel.processor.aggregate.AbstractListAggregationStrategy`
 abstract class in *Camel 2.11* that makes this easier. The completed
-Exchange that is sent out of the aggregator will contain the List<V> in
+Exchange that is sent out of the aggregator will contain the `List<V>` in
 the message body.
 
-For example to aggregate a List<Integer> you can extend this class as
+For example to aggregate a `List<Integer>` you can extend this class as
 shown below, and implement the `getValue` method:
 
 === Using AggregateController
@@ -359,10 +395,10 @@ from("direct:start")
     .to("mock:result");
 ----
 
-As a result we have one outgoing link:exchange.html[Exchange] being
-routed the the "mock:result" endpoint. The exchange is a holder
-containing all the incoming Exchanges. +
- To get access to these exchanges you need to access them from a
+As a result we have one outgoing `Exchange` being
+routed the the `"mock:result"` endpoint. The exchange is a holder
+containing all the incoming Exchanges.
+To get access to these exchanges you need to access them from a
 property on the outgoing exchange as shown:
 
 [source,java]
@@ -418,7 +454,7 @@ public String append(String existing, String next) {
 
 In the method below, we have only 4 parameters, so the 1st parameter is
 the body of the `oldExchange`, and the 2nd is the Map of the
-`oldExchange} headers, and the 3rd is paired to the body of the {{newExchange`,
+`oldExchange` headers, and the 3rd is paired to the body of the `newExchange`,
 and the 4th parameter is the Map of the `newExchange` headers:
 
 [source,java]
@@ -429,7 +465,7 @@ public String append(String existing, Map existingHeaders, 
String next, Map next
 ----
 
 And finally if we have 6 parameters the we also have the properties of
-the link:exchange.html[Exchange]s:
+the Exchanges:
 
 [source,java]
 ----
@@ -439,7 +475,7 @@ public String append(String existing, Map existingHeaders, 
Map existingPropertie
 }
 ----
 
-To use this with the link:aggregator2.html[Aggregate] EIP we can use a
+To use this with the Aggregate EIP we can use a
 POJO with the aggregate logic as follows:
 
 [source,java]
@@ -542,21 +578,21 @@ By default when using POJOs as AggregationStrategy, then 
the method is
 use the option `strategyMethodAllowNull` to configure this. Where as
 without using POJOs then you may have `null` as `oldExchange` or
 `newExchange` parameters. For example the
-link:aggregator2.html[Aggregate] EIP will invoke the
+Aggregate EIP will invoke the
 `AggregationStrategy` with `oldExchange` as null, for the first
-link:exchange.html[Exchange] incoming to the aggregator. And then for
+Exchange incoming to the aggregator. And then for
 subsequent link:exchange.html[Exchange]s then `oldExchange` and
 `newExchange` parameters are both not null.
 
-Example with link:content-enricher.html[Content Enricher] and no data
+Example with Content Enricher EIP and no data
 
-Though with POJOs as AggregationStrategy we made this simpler and only
+Though with POJOs as `AggregationStrategy` we made this simpler and only
 call the method when `oldExchange` and `newExchange` is not null, as
 that would be the most common use-case. If you need to allow
 `oldExchange` or `newExchange` to be null, then you can configure this
 with the POJO using the `AggregationStrategyBeanAdapter` as shown below.
 On the bean adapter we call `setAllowNullNewExchange` to allow the new
-exchange to be null.
+exchange to be `null`.
 
 [source,java]
 ----
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 5978b05..07bcfc5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -347,7 +347,6 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             lock.lock();
             try {
                 aggregated = doAggregation(key, copy);
-
             } finally {
                 lock.unlock();
             }
@@ -457,6 +456,14 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             throw new CamelExchangeException("AggregationStrategy " + 
aggregationStrategy + " returned null which is not allowed", newExchange);
         }
 
+        // check for the special exchange property to force completion of all 
groups
+        boolean completeAllGroups = 
answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        if (completeAllGroups) {
+            // remove the exchange property so we do not complete again
+            answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            forceCompletionOfAllGroups();
+        }
+
         // special for some repository implementations
         if (aggregationRepository instanceof RecoverableAggregationRepository) 
{
             boolean valid = oldExchange == null || 
answer.getExchangeId().equals(oldExchange.getExchangeId());
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
new file mode 100644
index 0000000..8f1a16a
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.junit.Test;
+
+/**
+ * To test CAMEL-10474
+ */
+public class AggregateForceCompletionHeaderInAggregationStrategyTest extends 
ContextTestSupport {
+
+    @Test
+    public void testCompletePreviousOnNewGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+
+        template.sendBody("direct:start", "A,A,A,B,B");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body())
+                        .to("log:input?showAll=true")
+                        .aggregate(simple("${body}"), new 
MyAggregationStrategy())
+                            
.completionPredicate(exchangeProperty(Exchange.SPLIT_COMPLETE))
+                    .to("log:aggregated", "mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                // we start a new correlation group, so complete all previous 
groups
+                
newExchange.setProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+                return newExchange;
+            }
+
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            // copy over flag to know when splitting is done on the old 
exchange
+            oldExchange.setProperty(Exchange.SPLIT_COMPLETE, 
newExchange.getProperty(Exchange.SPLIT_COMPLETE));
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to