This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 2e20c53 CAMEL-10474: Aggregate EIP allow to force complete of all
previous groups decided by AggregationStrategy.
2e20c53 is described below
commit 2e20c532241533d732f7a23a1cc57639ff97d8dd
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 27 13:00:19 2018 +0100
CAMEL-10474: Aggregate EIP allow to force complete of all previous groups
decided by AggregationStrategy.
---
camel-core/src/main/docs/eips/aggregate-eip.adoc | 84 +++++++++++++++-------
.../processor/aggregate/AggregateProcessor.java | 9 ++-
...eCompletionHeaderInAggregationStrategyTest.java | 72 +++++++++++++++++++
3 files changed, 140 insertions(+), 25 deletions(-)
diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc
b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index c2795af..80f239b 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -188,8 +188,7 @@ consumer etc)
The aggregator provides a pluggable repository which you can implement
your own `org.apache.camel.spi.AggregationRepository`. +
If you need persistent repository then you can use either Camel
-link:hawtdb.html[HawtDB], link:leveldb.html[LevelDB], or
-<<sql-component,SQL Component>> components.
+link:leveldb.html[LevelDB], or <<sql-component,SQL Component>> components.
=== Using TimeoutAwareAggregationStrategy
@@ -224,7 +223,7 @@ completion predicates / sizes / timeouts etc, and complete
the group.
For example the following logic (from an unit test) will complete the
group if the message body size is larger than 5. This is done by setting
-the property Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP to true.
+the property `Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP` to `true`.
[source,java]
----
@@ -245,34 +244,71 @@ the property Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
to true.
}
----
+
+=== Completing all previous group decided from the AggregationStrategy
+
+*Available as of Camel 2.21*
+
+The `AggregationStrategy` can now included a property on the
+returned `Exchange` that contains a boolean to indicate if all previous
+groups should be completed. This allows to overrule any existing
+completion predicates / sizes / timeouts etc, and complete all the existing
+previous group.
+
+For example the following logic (from an unit test) will complete all the
+previous group when a new aggregation group is started. This is done by
+setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
+
+[source,java]
+----
+ public final class MyCompletionStrategy implements AggregationStrategy {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ // we start a new correlation group, so complete all previous
groups
+
newExchange.setProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ return newExchange;
+ }
+
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+----
+
=== Manually Force the Completion of All Aggregated Exchanges Immediately
-*Available as of Camel 2.9* +
- You can manually trigger completion of all current aggregated exchanges
+*Available as of Camel 2.9*
+
+You can manually trigger completion of all current aggregated exchanges
by sending a message containing the header
-Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is
+`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
considered a signal message only, the message headers/contents will not
be processed otherwise.
-*Available as of Camel 2.11* +
- You can alternatively set the header
-Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE to true to trigger
+*Available as of Camel 2.11*
+
+You can alternatively set the header
+`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE` to `true` to trigger
completion of all groups after processing the current message.
=== Using a List<V> in AggregationStrategy
*Available as of Camel 2.11*
-If you want to aggregate some value from the messages <V> into a List<V>
+If you want to aggregate some value from the messages `<V>` into a `List<V>`
then we have added a
`org.apache.camel.processor.aggregate.AbstractListAggregationStrategy`
abstract class in *Camel 2.11* that makes this easier. The completed
-Exchange that is sent out of the aggregator will contain the List<V> in
+Exchange that is sent out of the aggregator will contain the `List<V>` in
the message body.
-For example to aggregate a List<Integer> you can extend this class as
+For example to aggregate a `List<Integer>` you can extend this class as
shown below, and implement the `getValue` method:
=== Using AggregateController
@@ -359,10 +395,10 @@ from("direct:start")
.to("mock:result");
----
-As a result we have one outgoing link:exchange.html[Exchange] being
-routed the the "mock:result" endpoint. The exchange is a holder
-containing all the incoming Exchanges. +
- To get access to these exchanges you need to access them from a
+As a result we have one outgoing `Exchange` being
+routed the the `"mock:result"` endpoint. The exchange is a holder
+containing all the incoming Exchanges.
+To get access to these exchanges you need to access them from a
property on the outgoing exchange as shown:
[source,java]
@@ -418,7 +454,7 @@ public String append(String existing, String next) {
In the method below, we have only 4 parameters, so the 1st parameter is
the body of the `oldExchange`, and the 2nd is the Map of the
-`oldExchange} headers, and the 3rd is paired to the body of the {{newExchange`,
+`oldExchange` headers, and the 3rd is paired to the body of the `newExchange`,
and the 4th parameter is the Map of the `newExchange` headers:
[source,java]
@@ -429,7 +465,7 @@ public String append(String existing, Map existingHeaders,
String next, Map next
----
And finally if we have 6 parameters the we also have the properties of
-the link:exchange.html[Exchange]s:
+the Exchanges:
[source,java]
----
@@ -439,7 +475,7 @@ public String append(String existing, Map existingHeaders,
Map existingPropertie
}
----
-To use this with the link:aggregator2.html[Aggregate] EIP we can use a
+To use this with the Aggregate EIP we can use a
POJO with the aggregate logic as follows:
[source,java]
@@ -542,21 +578,21 @@ By default when using POJOs as AggregationStrategy, then
the method is
use the option `strategyMethodAllowNull` to configure this. Where as
without using POJOs then you may have `null` as `oldExchange` or
`newExchange` parameters. For example the
-link:aggregator2.html[Aggregate] EIP will invoke the
+Aggregate EIP will invoke the
`AggregationStrategy` with `oldExchange` as null, for the first
-link:exchange.html[Exchange] incoming to the aggregator. And then for
+Exchange incoming to the aggregator. And then for
subsequent link:exchange.html[Exchange]s then `oldExchange` and
`newExchange` parameters are both not null.
-Example with link:content-enricher.html[Content Enricher] and no data
+Example with Content Enricher EIP and no data
-Though with POJOs as AggregationStrategy we made this simpler and only
+Though with POJOs as `AggregationStrategy` we made this simpler and only
call the method when `oldExchange` and `newExchange` is not null, as
that would be the most common use-case. If you need to allow
`oldExchange` or `newExchange` to be null, then you can configure this
with the POJO using the `AggregationStrategyBeanAdapter` as shown below.
On the bean adapter we call `setAllowNullNewExchange` to allow the new
-exchange to be null.
+exchange to be `null`.
[source,java]
----
diff --git
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 5978b05..07bcfc5 100644
---
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -347,7 +347,6 @@ public class AggregateProcessor extends ServiceSupport
implements AsyncProcessor
lock.lock();
try {
aggregated = doAggregation(key, copy);
-
} finally {
lock.unlock();
}
@@ -457,6 +456,14 @@ public class AggregateProcessor extends ServiceSupport
implements AsyncProcessor
throw new CamelExchangeException("AggregationStrategy " +
aggregationStrategy + " returned null which is not allowed", newExchange);
}
+ // check for the special exchange property to force completion of all
groups
+ boolean completeAllGroups =
answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false,
boolean.class);
+ if (completeAllGroups) {
+ // remove the exchange property so we do not complete again
+ answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+ forceCompletionOfAllGroups();
+ }
+
// special for some repository implementations
if (aggregationRepository instanceof RecoverableAggregationRepository)
{
boolean valid = oldExchange == null ||
answer.getExchangeId().equals(oldExchange.getExchangeId());
diff --git
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
new file mode 100644
index 0000000..8f1a16a
--- /dev/null
+++
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.junit.Test;
+
+/**
+ * To test CAMEL-10474
+ */
+public class AggregateForceCompletionHeaderInAggregationStrategyTest extends
ContextTestSupport {
+
+ @Test
+ public void testCompletePreviousOnNewGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+
+ template.sendBody("direct:start", "A,A,A,B,B");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(body())
+ .to("log:input?showAll=true")
+ .aggregate(simple("${body}"), new
MyAggregationStrategy())
+
.completionPredicate(exchangeProperty(Exchange.SPLIT_COMPLETE))
+ .to("log:aggregated", "mock:aggregated");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ // we start a new correlation group, so complete all previous
groups
+
newExchange.setProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+ return newExchange;
+ }
+
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ // copy over flag to know when splitting is done on the old
exchange
+ oldExchange.setProperty(Exchange.SPLIT_COMPLETE,
newExchange.getProperty(Exchange.SPLIT_COMPLETE));
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
[email protected].