Author: davsclaus Date: Fri Aug 6 06:55:20 2010 New Revision: 982890 URL: http://svn.apache.org/viewvc?rev=982890&view=rev Log: CAMEL-3023: Refined tineout for multicast to compute remaining time.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java - copied, changed from r982878, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=982890&r1=982889&r2=982890&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Aug 6 06:55:20 2010 @@ -261,8 +261,8 @@ public class MulticastProcessor extends // its to hard to do parallel async routing so we let the caller thread be synchronously // and have it pickup the replies and do the aggregation - // TODO: use a stopwatch to keep track of timeout left boolean timedOut = false; + final StopWatch watch = new StopWatch(); for (int i = 0; i < total.intValue(); i++) { Future<Exchange> future; if (timedOut) { @@ -270,9 +270,19 @@ public class MulticastProcessor extends // poll will return null if no tasks is present future = completion.poll(); } else if (timeout > 0) { - future = completion.poll(timeout, TimeUnit.MILLISECONDS); + long left = timeout - watch.taken(); + if (left < 0) { + left = 0; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Polling completion task #" + i + " using timeout " + left + " millis."); + } + future = completion.poll(left, TimeUnit.MILLISECONDS); } else { // take will wait until the task is complete + if (LOG.isTraceEnabled()) { + LOG.trace("Polling completion task #" + i); + } future = completion.take(); } @@ -286,7 +296,8 @@ public class MulticastProcessor extends // notify the strategy we timed out ((TimeoutAwareAggregationStrategy) strategy).timeout(result.get(), i, total.intValue(), timeout); } else { - LOG.warn("Parallel processing timed out after " + timeout + " millis for number " + i + ". Cannot aggregate"); + // log a WARN we timed out since it will not be aggregated and the Exchange will be lost + LOG.warn("Parallel processing timed out after " + timeout + " millis for number " + i + ". This task will be cancelled and will not be aggregated."); } timedOut = true; } else { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java (from r982878, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java&r1=982878&r2=982890&rev=982890&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java Fri Aug 6 06:55:20 2010 @@ -25,20 +25,20 @@ import org.apache.camel.processor.aggreg /** * @version $Revision$ */ -public class MulticastParallelTimeoutTest extends ContextTestSupport { +public class MulticastParallelTimeout2Test extends ContextTestSupport { public void testMulticastParallelTimeout() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - // A will timeout so we only get B and C - mock.expectedBodiesReceived("BC"); + // B will timeout so we only get A and C + mock.expectedBodiesReceived("AC"); - getMockEndpoint("mock:A").expectedMessageCount(0); - getMockEndpoint("mock:B").expectedMessageCount(1); + getMockEndpoint("mock:A").expectedMessageCount(1); + getMockEndpoint("mock:B").expectedMessageCount(0); getMockEndpoint("mock:C").expectedMessageCount(1); template.sendBody("direct:start", "Hello"); - // wait at least longer than the delay in A so we can ensure its being cancelled + // wait at least longer than the delay in B so we can ensure its being cancelled // and wont continue routing Thread.sleep(4000); @@ -68,9 +68,9 @@ public class MulticastParallelTimeoutTes .end() .to("mock:result"); - from("direct:a").delay(3000).to("mock:A").setBody(constant("A")); + from("direct:a").to("mock:A").setBody(constant("A")); - from("direct:b").to("mock:B").setBody(constant("B")); + from("direct:b").delay(3000).to("mock:B").setBody(constant("B")); from("direct:c").delay(500).to("mock:C").setBody(constant("C")); // END SNIPPET: e1 Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java?rev=982890&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java Fri Aug 6 06:55:20 2010 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version $Revision: 982655 $ + */ +public class MulticastParallelTimeout3Test extends ContextTestSupport { + + public void testMulticastParallelTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + // C will timeout so we only get A and B + mock.expectedBodiesReceived("AB"); + + getMockEndpoint("mock:A").expectedMessageCount(1); + getMockEndpoint("mock:B").expectedMessageCount(1); + getMockEndpoint("mock:C").expectedMessageCount(0); + + template.sendBody("direct:start", "Hello"); + + // wait at least longer than the delay in C so we can ensure its being cancelled + // and wont continue routing + Thread.sleep(3000); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .multicast(new AggregationStrategy() { + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); + return oldExchange; + } + }) + .parallelProcessing().timeout(4000).to("direct:a", "direct:b", "direct:c") + // use end to indicate end of multicast route + .end() + .to("mock:result"); + + from("direct:a").delay(2000).to("mock:A").setBody(constant("A")); + + from("direct:b").delay(3000).to("mock:B").setBody(constant("B")); + + from("direct:c").delay(6000).to("mock:C").setBody(constant("C")); + // END SNIPPET: e1 + } + }; + } +} Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=982890&r1=982889&r2=982890&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Aug 6 06:55:20 2010 @@ -20,13 +20,18 @@ # log4j.rootLogger=INFO, file -log4j.logger.org.apache.activemq.spring=WARN +log4j.logger.org.apache.camel.impl.converter=WARN +log4j.logger.org.apache.camel.management=WARN +log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN +#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE + #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component=TRACE #log4j.logger.org.apache.camel.component.seda=TRACE #log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE #log4j.logger.org.apache.camel.component.mock=DEBUG #log4j.logger.org.apache.camel.component.file=TRACE +#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE #log4j.logger.org.apache.camel.processor.Pipeline=TRACE #log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE #log4j.logger.org.apache.camel.processor.RecipientList=TRACE @@ -36,15 +41,9 @@ log4j.logger.org.apache.activemq.spring= #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE #log4j.logger.org.apache.camel.processor.Delayer=TRACE #log4j.logger.org.apache.camel.processor.Throttler=TRACE -log4j.logger.org.apache.camel.impl.converter=WARN -log4j.logger.org.apache.camel.management=WARN -log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN #log4j.logger.org.apache.camel.impl=TRACE #log4j.logger.org.apache.camel.util.FileUtil=TRACE #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE -#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE -#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE -#log4j.logger.org.apache.camel.processor.Pipeline=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender