This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.25.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push: new 3bf5aed back port fix for group aggregation for Camel 2.25.x (#3514) 3bf5aed is described below commit 3bf5aed38f0cfd22711cd0ce9f08619eee5e41ad Author: grigoni <gianandrea.rig...@gmail.com> AuthorDate: Sat Jan 25 12:03:35 2020 +0100 back port fix for group aggregation for Camel 2.25.x (#3514) * CAMEL-14414: for aggregation group completion setting and removal is handled through methods to enforce code cohesion and avoid repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted documentation accordingly * CAMEL-14414: adapted group aggregation related test to match expectation that group flags are cleaned up after processing * CAMEL-14414: polish --- camel-core/src/main/docs/eips/aggregate-eip.adoc | 2 +- .../processor/aggregate/AggregateProcessor.java | 65 +++++++++++++++++----- ...eCompletionHeaderInAggregationStrategyTest.java | 2 + .../AggregateForceCompletionHeaderTest.java | 3 +- .../AggregationStrategyCompleteByPropertyTest.java | 8 +++ .../HawtDBAggregateForceCompletionHeaderTest.java | 2 +- .../LevelDBAggregateForceCompletionHeaderTest.java | 2 +- .../JdbcAggregateForceCompletionHeaderTest.java | 2 +- 8 files changed, 66 insertions(+), 20 deletions(-) diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc b/camel-core/src/main/docs/eips/aggregate-eip.adoc index f7f2699..900350a 100644 --- a/camel-core/src/main/docs/eips/aggregate-eip.adoc +++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc @@ -286,7 +286,7 @@ setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`. *Available as of Camel 2.9* You can manually trigger completion of all current aggregated exchanges -by sending a message containing the header +by sending an exchange containing the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is considered a signal message only, the message headers/contents will not be processed otherwise. diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index c32d34a..41971cf 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -283,10 +283,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } //check for the special header to force completion of all groups (and ignore the exchange otherwise) - boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + boolean completeAllGroups = isCompleteAllGroups(exchange); if (completeAllGroups) { // remove the header so we do not complete again - exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + removeFlagCompleteAllGroups(exchange); forceCompletionOfAllGroups(); return; } @@ -320,9 +320,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); // remove the complete all groups headers as it should not be on the copy - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + removeFlagCompleteCurrentGroup(copy); + removeFlagCompleteAllGroups(copy); + removeFlagCompleteAllGroupsInclusive(copy); try { aggregated = doAggregation(key, copy); @@ -350,9 +350,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); // remove the complete all groups headers as it should not be on the copy - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + removeFlagCompleteCurrentGroup(copy); + removeFlagCompleteAllGroups(copy); + removeFlagCompleteAllGroupsInclusive(copy); // when memory based then its fast using synchronized, but if the aggregation repository is IO // bound such as JPA etc then concurrent aggregation per correlation key could @@ -372,15 +372,49 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } } - // check for the special header to force completion of all groups (inclusive of the message) - boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); + // check for the special flag to force completion of all groups (inclusive of the message) + boolean completeAllGroupsInclusive = isCompleteAllGroupsInclusive(exchange); if (completeAllGroupsInclusive) { - // remove the header so we do not complete again - exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + // remove the flag so we do not complete again + removeFlagCompleteAllGroupsInclusive(exchange); forceCompletionOfAllGroups(); } } + private Object removeFlagCompleteCurrentGroup(Exchange exchange) { + //before everywhere : return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); + return exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); + } + + private Boolean isCompleteCurrentGroup(Exchange exchange) { + return exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class); + } + + private Object removeFlagCompleteAllGroups(Exchange exchange) { + Object removedHeader = exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + Object removedProp = exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + return removedHeader == null ? removedProp: removedHeader; + } + + private Boolean isCompleteAllGroups(Exchange exchange) { + boolean retVal; + retVal = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + if(!retVal) { + // according to doc it is a property but it is sometimes read as header + // some test don't fail because they use the header expression which contains a fallback to properties + retVal = exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + } + return retVal; + } + + private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) { + return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + } + + private Boolean isCompleteAllGroupsInclusive(Exchange exchange) { + return exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); + } + /** * Aggregates the exchange with the given correlation key * <p/> @@ -473,10 +507,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } // check for the special exchange property to force completion of all groups - boolean completeAllGroups = answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + boolean completeAllGroups = isCompleteAllGroups(answer); if (completeAllGroups) { // remove the exchange property so we do not complete again - answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + removeFlagCompleteAllGroups(answer); forceCompletionOfAllGroups(); } else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) { // its a new group so force complete of all existing groups @@ -613,7 +647,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } } - if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) { + if (isCompleteCurrentGroup(exchange)) { + removeFlagCompleteCurrentGroup(exchange); return "strategy"; } diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java index 8f1a16a..f458492 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java @@ -30,6 +30,8 @@ public class AggregateForceCompletionHeaderInAggregationStrategyTest extends Con @Test public void testCompletePreviousOnNewGroup() throws Exception { getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB"); + getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); template.sendBody("direct:start", "A,A,A,B,B"); diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java index 11e1c3a9..576e806 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java @@ -46,9 +46,10 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java index b69d5bd..dd56568 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java @@ -35,6 +35,14 @@ public class AggregationStrategyCompleteByPropertyTest extends ContextTestSuppor result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy"); result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy"); + // org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) is going to property fallback + // the test (without the fix) will fail into error: + // java.lang.AssertionError: Assertion error at index 0 on mock mock://aggregated with predicate: + // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2] + getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull(); + // according to manual + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull(); + template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java index 8fd453f..db2be47 100644 --- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java +++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java @@ -54,7 +54,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport { getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java index 598ae86..4dfc1fb 100644 --- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java +++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java @@ -54,7 +54,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java index 5dfb8e1..3df38bb 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java @@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); }