This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.0.x by this push: new 2deab6f CAMEL-14414: for aggregation group completion setting and removal is handled through methods to enforce code cohesion and avoid repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted tests and documentation (#3554) 2deab6f is described below commit 2deab6f89353e679e62533f9447bfb355703c7b2 Author: grigoni <gianandrea.rig...@gmail.com> AuthorDate: Sat Feb 8 09:29:25 2020 +0100 CAMEL-14414: for aggregation group completion setting and removal is handled through methods to enforce code cohesion and avoid repetitions. fixed issue for CURRENT_GROUP and ALL_GROUPS flag removal. adapted tests and documentation (#3554) --- .../LevelDBAggregateForceCompletionHeaderTest.java | 2 +- .../JdbcAggregateForceCompletionHeaderTest.java | 2 +- .../processor/aggregate/AggregateProcessor.java | 53 ++++++++++++++++------ .../src/main/docs/eips/aggregate-eip.adoc | 2 +- ...eCompletionHeaderInAggregationStrategyTest.java | 2 + .../AggregateForceCompletionHeaderTest.java | 3 +- .../AggregationStrategyCompleteByPropertyTest.java | 8 ++++ 7 files changed, 54 insertions(+), 18 deletions(-) diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java index c033dcf..3c2f744 100644 --- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java +++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java @@ -55,7 +55,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java index d26cb59..ed6b157 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java @@ -45,7 +45,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index ad0f96d..81c0af2 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -313,7 +313,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } //check for the special header to force completion of all groups (and ignore the exchange otherwise) - if (getAndRemoveBooleanHeader(exchange, Exchange.AGGREGATION_COMPLETE_ALL_GROUPS)) { + if (isCompleteAllGroups(exchange)) { + removeFlagCompleteAllGroups(exchange); forceCompletionOfAllGroups(); callback.done(false); return; @@ -376,10 +377,10 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // the aggregated output runs in another unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); - // remove the complete all groups headers as it should not be on the copy - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); - copy.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + // remove the complete all groups flags as it should not be on the copy + removeFlagCompleteCurrentGroup(copy); + removeFlagCompleteAllGroups(copy); + removeFlagCompleteAllGroupsInclusive(copy); List<Exchange> aggregated = null; lock.lock(); @@ -398,23 +399,45 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } // check for the special header to force completion of all groups (inclusive of the message) - if (getAndRemoveBooleanHeader(exchange, Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE)) { + if (isCompleteAllGroupsInclusive(exchange)) { + removeFlagCompleteAllGroupsInclusive(exchange); forceCompletionOfAllGroups(); } callback.done(false); } - protected boolean getBooleanProperty(Exchange exchange, String key) { - return camelContext.getTypeConverter().convertTo(boolean.class, exchange, exchange.getProperty(key)); + private Object removeFlagCompleteCurrentGroup(Exchange exchange) { + //before everywhere : return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); + return exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP); } - protected boolean getAndRemoveBooleanProperty(Exchange exchange, String key) { - return camelContext.getTypeConverter().convertTo(boolean.class, exchange, exchange.removeProperty(key)); + private Boolean isCompleteCurrentGroup(Exchange exchange) { + return exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class); } - protected boolean getAndRemoveBooleanHeader(Exchange exchange, String key) { - return camelContext.getTypeConverter().convertTo(boolean.class, exchange, exchange.getIn().removeHeader(key)); + private Object removeFlagCompleteAllGroups(Exchange exchange) { + Object removedHeader = exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + Object removedProp = exchange.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); + return removedHeader == null ? removedProp : removedHeader; + } + + private Boolean isCompleteAllGroups(Exchange exchange) { + boolean retVal = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + if (!retVal) { + // according to doc it is a property but it is sometimes read as header + // some test don't fail because they use the header expression which contains a fallback to properties + retVal = exchange.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); + } + return retVal; + } + + private Object removeFlagCompleteAllGroupsInclusive(Exchange exchange) { + return exchange.getIn().removeHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE); + } + + private Boolean isCompleteAllGroupsInclusive(Exchange exchange) { + return exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); } /** @@ -522,7 +545,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } // check for the special exchange property to force completion of all groups - if (getAndRemoveBooleanProperty(answer, Exchange.AGGREGATION_COMPLETE_ALL_GROUPS)) { + if (isCompleteAllGroups(answer)) { + removeFlagCompleteAllGroups(answer); forceCompletionOfAllGroups(); } else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) { // its a new group so force complete of all existing groups @@ -644,7 +668,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } } - if (getBooleanProperty(exchange, Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP)) { + if (isCompleteCurrentGroup(exchange)) { + removeFlagCompleteCurrentGroup(exchange); return COMPLETED_BY_STRATEGY; } diff --git a/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc b/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc index 2708b1f..005d6b4 100644 --- a/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc +++ b/core/camel-core-engine/src/main/docs/eips/aggregate-eip.adoc @@ -278,7 +278,7 @@ setting the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` to `true`. == Manually Force the Completion of All Aggregated Exchanges Immediately You can manually trigger completion of all current aggregated exchanges -by sending a message containing the header +by sending an exchange containing the property `Exchange.AGGREGATION_COMPLETE_ALL_GROUPS` set to `true`. The message is considered a signal message only, the message headers/contents will not be processed otherwise. diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java index cced169..b6b9542 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderInAggregationStrategyTest.java @@ -30,6 +30,8 @@ public class AggregateForceCompletionHeaderInAggregationStrategyTest extends Con @Test public void testCompletePreviousOnNewGroup() throws Exception { getMockEndpoint("mock:aggregated").expectedBodiesReceived("AAA", "BB"); + getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); template.sendBody("direct:start", "A,A,A,B,B"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java index d438615..c785cd1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java @@ -47,10 +47,11 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS).isNull(); // now send the signal message to trigger completion of all groups, // message should NOT be aggregated - template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); + template.sendBodyAndProperty("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); assertMockEndpointsSatisfied(); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java index 26e613b..0a55c4d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java @@ -32,6 +32,14 @@ public class AggregationStrategyCompleteByPropertyTest extends ContextTestSuppor result.message(0).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy"); result.message(1).exchangeProperty(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("strategy"); + // org.apache.camel.builder.ExpressionBuilder.headerExpression(java.lang.String) is going to property fallback + // the test (without the fix) will fail into error: + // java.lang.AssertionError: Assertion error at index 0 on mock mock://aggregated with predicate: + // header(CamelAggregationCompleteCurrentGroup) is null evaluated as: true is null on Exchange[ID-MacBook-Pro-1578822701664-0-2] + getMockEndpoint("mock:aggregated").allMessages().header(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull(); + // according to manual + getMockEndpoint("mock:aggregated").allMessages().exchangeProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP).isNull(); + template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123);