This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 3bf5aed  back port fix for group aggregation for Camel 2.25.x  (#3514)
3bf5aed is described below

commit 3bf5aed38f0cfd22711cd0ce9f08619eee5e41ad
Author: grigoni <gianandrea.rig...@gmail.com>
AuthorDate: Sat Jan 25 12:03:35 2020 +0100

    back port fix for group aggregation for Camel 2.25.x  (#3514)
    
    * CAMEL-14414: for aggregation group completion setting and removal is 
handled through methods to enforce code cohesion and avoid repetitions. fixed 
issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted documentation 
accordingly
    
    * CAMEL-14414: adapted group aggregation related test to match expectation 
that group flags are cleaned up after processing
    
    * CAMEL-14414: polish
---
 camel-core/src/main/docs/eips/aggregate-eip.adoc   |  2 +-
 .../processor/aggregate/AggregateProcessor.java    | 65 +++++++++++++++++-----
 ...eCompletionHeaderInAggregationStrategyTest.java |  2 +
 .../AggregateForceCompletionHeaderTest.java        |  3 +-
 .../AggregationStrategyCompleteByPropertyTest.java |  8 +++
 .../HawtDBAggregateForceCompletionHeaderTest.java  |  2 +-
 .../LevelDBAggregateForceCompletionHeaderTest.java |  2 +-
 .../JdbcAggregateForceCompletionHeaderTest.java    |  2 +-
 8 files changed, 66 insertions(+), 20 deletions(-)

diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc 
b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index f7f2699..900350a 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -286,7 +286,7 @@ setting the property 
`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
 *Available as of Camel 2.9*
 
 You can manually trigger completion of all current aggregated exchanges
-by sending a message containing the header
+by sending an exchange containing the property
 `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
 considered a signal message only, the message headers/contents will not
 be processed otherwise.
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c32d34a..41971cf 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -283,10 +283,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         }
 
         //check for the special header to force completion of all groups (and 
ignore the exchange otherwise)
-        boolean completeAllGroups = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        boolean completeAllGroups = isCompleteAllGroups(exchange);
         if (completeAllGroups) {
             // remove the header so we do not complete again
-            
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            removeFlagCompleteAllGroups(exchange);
             forceCompletionOfAllGroups();
             return;
         }
@@ -320,9 +320,9 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
                 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
 
                 // remove the complete all groups headers as it should not be 
on the copy
-                
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
-                
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
-                
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+                removeFlagCompleteCurrentGroup(copy);
+                removeFlagCompleteAllGroups(copy);
+                removeFlagCompleteAllGroupsInclusive(copy);
 
                 try {
                     aggregated = doAggregation(key, copy);
@@ -350,9 +350,9 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
 
             // remove the complete all groups headers as it should not be on 
the copy
-            
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
-            
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
-            
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+            removeFlagCompleteCurrentGroup(copy);
+            removeFlagCompleteAllGroups(copy);
+            removeFlagCompleteAllGroupsInclusive(copy);
 
             // when memory based then its fast using synchronized, but if the 
aggregation repository is IO
             // bound such as JPA etc then concurrent aggregation per 
correlation key could
@@ -372,15 +372,49 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             }
         }
 
-        // check for the special header to force completion of all groups 
(inclusive of the message)
-        boolean completeAllGroupsInclusive = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, 
false, boolean.class);
+        // check for the special flag to force completion of all groups 
(inclusive of the message)
+        boolean completeAllGroupsInclusive = 
isCompleteAllGroupsInclusive(exchange);
         if (completeAllGroupsInclusive) {
-            // remove the header so we do not complete again
-            
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+            // remove the flag so we do not complete again
+            removeFlagCompleteAllGroupsInclusive(exchange);
             forceCompletionOfAllGroups();
         }
     }
 
+    private Object removeFlagCompleteCurrentGroup(Exchange exchange) {
+        //before everywhere : return 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+        return 
exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+    }
+
+    private Boolean isCompleteCurrentGroup(Exchange exchange) {
+        return 
exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, 
boolean.class);
+    }
+
+    private Object removeFlagCompleteAllGroups(Exchange exchange) {
+        Object removedHeader = 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        Object removedProp = 
exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        return removedHeader == null ? removedProp: removedHeader;
+    }
+
+    private Boolean isCompleteAllGroups(Exchange exchange) {
+        boolean retVal;
+        retVal = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        if(!retVal) {
+            // according to doc it is a property but it is sometimes read as 
header
+            // some test don't fail because they use the header expression 
which contains a fallback to properties
+            retVal = 
exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        }
+        return retVal;
+    }
+
+    private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) {
+        return 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+    }
+
+    private Boolean isCompleteAllGroupsInclusive(Exchange exchange) {
+        return 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, 
false, boolean.class);
+    }
+
     /**
      * Aggregates the exchange with the given correlation key
      * <p/>
@@ -473,10 +507,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         }
 
         // check for the special exchange property to force completion of all 
groups
-        boolean completeAllGroups = 
answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        boolean completeAllGroups = isCompleteAllGroups(answer);
         if (completeAllGroups) {
             // remove the exchange property so we do not complete again
-            answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+            removeFlagCompleteAllGroups(answer);
             forceCompletionOfAllGroups();
         } else if (isCompletionOnNewCorrelationGroup() && originalExchange == 
null) {
             // its a new group so force complete of all existing groups
@@ -613,7 +647,8 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             }
         }
 
-        if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, 
false, boolean.class)) {
+        if (isCompleteCurrentGroup(exchange)) {
+            removeFlagCompleteCurrentGroup(exchange);
             return "strategy";
         }
 
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
index 8f1a16a..f458492 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -30,6 +30,8 @@ public class 
AggregateForceCompletionHeaderInAggregationStrategyTest extends Con
     @Test
     public void testCompletePreviousOnNewGroup() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+        
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         template.sendBody("direct:start", "A,A,A,B,B");
 
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 11e1c3a9..576e806 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -46,9 +46,10 @@ public class AggregateForceCompletionHeaderTest extends 
ContextTestSupport {
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
         
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
index b69d5bd..dd56568 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
@@ -35,6 +35,14 @@ public class AggregationStrategyCompleteByPropertyTest 
extends ContextTestSuppor
         
result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
         
result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
 
+        // 
org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) 
is going to property fallback
+        // the test (without the fix) will fail into error:
+        // java.lang.AssertionError: Assertion error at index 0 on mock 
mock://aggregated with predicate:
+        // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: 
true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2]
+        
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+        // according to manual
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
diff --git 
a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
 
b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index 8fd453f..db2be47 100644
--- 
a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends 
CamelTestSupport {
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index 598ae86..4dfc1fb 100644
--- 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -54,7 +54,7 @@ public class LevelDBAggregateForceCompletionHeaderTest 
extends CamelTestSupport
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 5dfb8e1..3df38bb 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends 
AbstractJdbcAggregat
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }

Reply via email to