Repository: camel Updated Branches: refs/heads/master 109d8ecb4 -> 62debbe44
[CAMEL-8423] Enchance Aggregator to allow AggregationStrategy to have some level of control over if the aggregation is complete. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/62debbe4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/62debbe4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/62debbe4 Branch: refs/heads/master Commit: 62debbe44d5b3619f6b5493cb3f8750ecee41d65 Parents: 109d8ec Author: Daniel Kulp <dk...@apache.org> Authored: Tue Mar 3 12:50:37 2015 -0500 Committer: Daniel Kulp <dk...@apache.org> Committed: Tue Mar 3 12:50:37 2015 -0500 ---------------------------------------------------------------------- .../processor/aggregate/AggregateProcessor.java | 6 ++ .../aggregate/AggregationStrategy.java | 7 ++ ...egateAggregationStrategyIsPredicateTest.java | 74 ++++++++++++++++++++ ...gregationStrategyCompleteByPropertyTest.java | 71 +++++++++++++++++++ 4 files changed, 158 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/62debbe4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index ebc6b84..a1fa0ed 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -405,6 +405,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion */ protected String isCompleted(String key, Exchange exchange) { + if (exchange.getProperty(AggregationStrategy.IS_COMPLETE) == Boolean.TRUE) { + return "strategy"; + } // batch consumer completion must always run first if (isCompletionFromBatchConsumer()) { batchConsumerCorrelationKeys.add(key); @@ -606,6 +609,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } public Predicate getCompletionPredicate() { + if (completionPredicate == null && aggregationStrategy instanceof Predicate) { + return (Predicate)aggregationStrategy; + } return completionPredicate; } http://git-wip-us.apache.org/repos/asf/camel/blob/62debbe4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java index e8cd90f..bf5f5be 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java @@ -47,6 +47,13 @@ public interface AggregationStrategy { // TODO: In Camel 3.0 we should move this to org.apache.camel package /** + * During the call to {@link #aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange) aggregate}, if the + * AggregationStrategy determines that the aggregation is complete and should be sent on (example: size limits exceeded), + * then it can set the property on the returned Exchange to Boolean.TRUE to mark the aggregation as complete. + */ + String IS_COMPLETE = "AggregationStrategy.IS_COMPLETE"; + + /** * Aggregates an old and new exchange together to create a single combined exchange * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) http://git-wip-us.apache.org/repos/asf/camel/blob/62debbe4/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateAggregationStrategyIsPredicateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateAggregationStrategyIsPredicateTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateAggregationStrategyIsPredicateTest.java new file mode 100644 index 0000000..3747760 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateAggregationStrategyIsPredicateTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Predicate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * + */ +public class AggregateAggregationStrategyIsPredicateTest extends ContextTestSupport { + + public void testAggregateCompletionAware() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+Y+ZZZZ"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "Y", "id", 123); + template.sendBodyAndHeader("direct:start", "ZZZZ", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new MyCompletionStrategy()) + .to("mock:aggregated"); + } + }; + } + + private final class MyCompletionStrategy implements AggregationStrategy, Predicate { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String oldBody = oldExchange.getIn().getBody(String.class); + String newBody = newExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(oldBody + "+" + newBody); + return oldExchange; + } + + @Override + public boolean matches(Exchange exchange) { + return exchange.getIn().getBody(String.class).length() >= 5; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/62debbe4/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java new file mode 100644 index 0000000..a3b5933 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyCompleteByPropertyTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * + */ +public class AggregationStrategyCompleteByPropertyTest extends ContextTestSupport { + + public void testAggregateCompletionAware() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+Y+ZZZZ"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "Y", "id", 123); + template.sendBodyAndHeader("direct:start", "ZZZZ", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new MyCompletionStrategy()).completionTimeout(1000) + .to("mock:aggregated"); + } + }; + } + + private final class MyCompletionStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class) + "+" + + newExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(body); + if (body.length() >= 5) { + oldExchange.setProperty(AggregationStrategy.IS_COMPLETE, true); + } + return oldExchange; + } + } +}