Repository: camel Updated Branches: refs/heads/master 7d91940bf -> 35df1198e
CAMEL-8081 Multicast Aggregator should keep processing other exchange which is not timeout Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e985f9b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e985f9b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e985f9b Branch: refs/heads/master Commit: 2e985f9bdef328103e1b7eff2392d9e42b3d0312 Parents: 74b6c5c Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Nov 27 15:13:58 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Nov 27 15:16:17 2014 +0800 ---------------------------------------------------------------------- .../camel/processor/MulticastProcessor.java | 5 +- .../MulticastParallelAllTimeoutAwareTest.java | 3 +- .../MulticastParallelTwoTimeoutMiddleTest.java | 83 ++++++++++++++++++++ 3 files changed, 86 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2e985f9b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 1d579cd..38e70bb 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -442,10 +442,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor } } - if (future == null && timedOut) { - // we are timed out and no more tasks complete so break out - break; - } else if (future == null) { + if (future == null) { // timeout occurred AggregationStrategy strategy = getAggregationStrategy(null); if (strategy instanceof TimeoutAwareAggregationStrategy) { http://git-wip-us.apache.org/repos/asf/camel/blob/2e985f9b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java index b941f47..9cd5bb7 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java @@ -42,7 +42,8 @@ public class MulticastParallelAllTimeoutAwareTest extends ContextTestSupport { assertMockEndpointsSatisfied(); assertNotNull(receivedExchange); - assertEquals(0, receivedIndex); + // Just make sure the MyAggregationStrategy is called for all the exchange + assertEquals(2, receivedIndex); assertEquals(3, receivedTotal); assertEquals(500, receivedTimeout); } http://git-wip-us.apache.org/repos/asf/camel/blob/2e985f9b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTwoTimeoutMiddleTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTwoTimeoutMiddleTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTwoTimeoutMiddleTest.java new file mode 100644 index 0000000..0eee62b --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTwoTimeoutMiddleTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version + */ +public class MulticastParallelTwoTimeoutMiddleTest extends ContextTestSupport { + + public void testMulticastParallelTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + // B will timeout so we only get A and/or C + //mock.message(0).body().not(body().contains("B")); + + mock.expectedBodiesReceived("ACE"); + + getMockEndpoint("mock:A").expectedMessageCount(1); + getMockEndpoint("mock:B").expectedMessageCount(0); + getMockEndpoint("mock:C").expectedMessageCount(1); + getMockEndpoint("mock:D").expectedMessageCount(0); + getMockEndpoint("mock:E").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .multicast(new AggregationStrategy() { + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); + return oldExchange; + } + }) + .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e") + // use end to indicate end of multicast route + .end() + .to("mock:result"); + + from("direct:a").to("mock:A").setBody(constant("A")); + + from("direct:b").delay(1000).to("mock:B").setBody(constant("B")); + + from("direct:c").to("mock:C").setBody(constant("C")); + + from("direct:d").delay(1000).to("mock:D").setBody(constant("D")); + + from("direct:e").to("mock:E").setBody(constant("E")); + } + }; + } +}