This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.0.x by this push:
     new 2deab6f  CAMEL-14414: for aggregation group completion setting and 
removal is handled through methods to enforce code cohesion and avoid 
repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted 
tests and documentation (#3554)
2deab6f is described below

commit 2deab6f89353e679e62533f9447bfb355703c7b2
Author: grigoni <gianandrea.rig...@gmail.com>
AuthorDate: Sat Feb 8 09:29:25 2020 +0100

    CAMEL-14414: for aggregation group completion setting and removal is 
handled through methods to enforce code cohesion and avoid repetitions. fixed 
issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted tests and 
documentation (#3554)
---
 .../LevelDBAggregateForceCompletionHeaderTest.java |  2 +-
 .../JdbcAggregateForceCompletionHeaderTest.java    |  2 +-
 .../processor/aggregate/AggregateProcessor.java    | 53 ++++++++++++++++------
 .../src/main/docs/eips/aggregate-eip.adoc          |  2 +-
 ...eCompletionHeaderInAggregationStrategyTest.java |  2 +
 .../AggregateForceCompletionHeaderTest.java        |  3 +-
 .../AggregationStrategyCompleteByPropertyTest.java |  8 ++++
 7 files changed, 54 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index c033dcf..3c2f744 100644
--- 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -55,7 +55,7 @@ public class LevelDBAggregateForceCompletionHeaderTest 
extends CamelTestSupport
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index d26cb59..ed6b157 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends 
AbstractJdbcAggregat
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index ad0f96d..81c0af2 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -313,7 +313,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         }
 
         //check for the special header to force completion of all groups (and 
ignore the exchange otherwise)
-        if (getAndRemoveBooleanHeader(exchange, 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS)) {
+        if (isCompleteAllGroups(exchange)) {
+            removeFlagCompleteAllGroups(exchange);
             forceCompletionOfAllGroups();
             callback.done(false);
             return;
@@ -376,10 +377,10 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         // the aggregated output runs in another unit of work
         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
 
-        // remove the complete all groups headers as it should not be on the 
copy
-        copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
-        copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
-        
copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+        // remove the complete all groups flags as it should not be on the copy
+        removeFlagCompleteCurrentGroup(copy);
+        removeFlagCompleteAllGroups(copy);
+        removeFlagCompleteAllGroupsInclusive(copy);
 
         List<Exchange> aggregated = null;
         lock.lock();
@@ -398,23 +399,45 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         }
 
         // check for the special header to force completion of all groups 
(inclusive of the message)
-        if (getAndRemoveBooleanHeader(exchange, 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE)) {
+        if (isCompleteAllGroupsInclusive(exchange)) {
+            removeFlagCompleteAllGroupsInclusive(exchange);
             forceCompletionOfAllGroups();
         }
 
         callback.done(false);
     }
 
-    protected boolean getBooleanProperty(Exchange exchange, String key) {
-        return camelContext.getTypeConverter().convertTo(boolean.class, 
exchange, exchange.getProperty(key));
+    private Object removeFlagCompleteCurrentGroup(Exchange exchange) {
+        //before everywhere : return 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
+        return 
exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP);
     }
 
-    protected boolean getAndRemoveBooleanProperty(Exchange exchange, String 
key) {
-        return camelContext.getTypeConverter().convertTo(boolean.class, 
exchange, exchange.removeProperty(key));
+    private Boolean isCompleteCurrentGroup(Exchange exchange) {
+        return 
exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, 
boolean.class);
     }
 
-    protected boolean getAndRemoveBooleanHeader(Exchange exchange, String key) 
{
-        return camelContext.getTypeConverter().convertTo(boolean.class, 
exchange, exchange.getIn().removeHeader(key));
+    private Object removeFlagCompleteAllGroups(Exchange exchange) {
+        Object removedHeader = 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        Object removedProp = 
exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
+        return removedHeader == null ? removedProp : removedHeader;
+    }
+
+    private Boolean isCompleteAllGroups(Exchange exchange) {
+        boolean retVal = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        if (!retVal) {
+            // according to doc it is a property but it is sometimes read as 
header
+            // some test don't fail because they use the header expression 
which contains a fallback to properties
+            retVal = 
exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
+        }
+        return retVal;
+    }
+
+    private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) {
+        return 
exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE);
+    }
+
+    private Boolean isCompleteAllGroupsInclusive(Exchange exchange) {
+        return 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, 
false, boolean.class);
     }
 
     /**
@@ -522,7 +545,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         }
 
         // check for the special exchange property to force completion of all 
groups
-        if (getAndRemoveBooleanProperty(answer, 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS)) {
+        if (isCompleteAllGroups(answer)) {
+            removeFlagCompleteAllGroups(answer);
             forceCompletionOfAllGroups();
         } else if (isCompletionOnNewCorrelationGroup() && originalExchange == 
null) {
             // its a new group so force complete of all existing groups
@@ -644,7 +668,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
             }
         }
 
-        if (getBooleanProperty(exchange, 
Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP)) {
+        if (isCompleteCurrentGroup(exchange)) {
+            removeFlagCompleteCurrentGroup(exchange);
             return COMPLETED_BY_STRATEGY;
         }
 
diff --git a/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc 
b/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc
index 2708b1f..005d6b4 100644
--- a/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc
@@ -278,7 +278,7 @@ setting the property 
`Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`.
 == Manually Force the Completion of All Aggregated Exchanges Immediately
 
 You can manually trigger completion of all current aggregated exchanges
-by sending a message containing the header
+by sending an exchange containing the property
 `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is
 considered a signal message only, the message headers/contents will not
 be processed otherwise.
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
index cced169..b6b9542 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java
@@ -30,6 +30,8 @@ public class 
AggregateForceCompletionHeaderInAggregationStrategyTest extends Con
     @Test
     public void testCompletePreviousOnNewGroup() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB");
+        
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         template.sendBody("direct:start", "A,A,A,B,B");
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index d438615..c785cd1 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -47,10 +47,11 @@ public class AggregateForceCompletionHeaderTest extends 
ContextTestSupport {
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
         
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
         
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull();
 
         // now send the signal message to trigger completion of all groups,
         // message should NOT be aggregated
-        template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
+        template.sendBodyAndProperty("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
 
         assertMockEndpointsSatisfied();
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
index 26e613b..0a55c4d 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java
@@ -32,6 +32,14 @@ public class AggregationStrategyCompleteByPropertyTest 
extends ContextTestSuppor
         
result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
         
result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy");
 
+        // 
org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) 
is going to property fallback
+        // the test (without the fix) will fail into error:
+        // java.lang.AssertionError: Assertion error at index 0 on mock 
mock://aggregated with predicate:
+        // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: 
true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2]
+        
getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+        // according to manual
+        
getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull();
+
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);

Reply via email to