Author: davsclaus
Date: Fri Aug  6 06:55:20 2010
New Revision: 982890

URL: http://svn.apache.org/viewvc?rev=982890&view=rev
Log:
CAMEL-3023: Refined tineout for multicast to compute remaining time.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java
      - copied, changed from r982878, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=982890&r1=982889&r2=982890&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Fri Aug  6 06:55:20 2010
@@ -261,8 +261,8 @@ public class MulticastProcessor extends 
 
         // its to hard to do parallel async routing so we let the caller 
thread be synchronously
         // and have it pickup the replies and do the aggregation
-        // TODO: use a stopwatch to keep track of timeout left
         boolean timedOut = false;
+        final StopWatch watch = new StopWatch();
         for (int i = 0; i < total.intValue(); i++) {
             Future<Exchange> future;
             if (timedOut) {
@@ -270,9 +270,19 @@ public class MulticastProcessor extends 
                 // poll will return null if no tasks is present
                 future = completion.poll();
             } else if (timeout > 0) {
-                future = completion.poll(timeout, TimeUnit.MILLISECONDS);
+                long left = timeout - watch.taken();
+                if (left < 0) {
+                    left = 0;
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Polling completion task #" + i + " using 
timeout " + left + " millis.");
+                }
+                future = completion.poll(left, TimeUnit.MILLISECONDS);
             } else {
                 // take will wait until the task is complete
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Polling completion task #" + i);
+                }
                 future = completion.take();
             }
 
@@ -286,7 +296,8 @@ public class MulticastProcessor extends 
                     // notify the strategy we timed out
                     ((TimeoutAwareAggregationStrategy) 
strategy).timeout(result.get(), i, total.intValue(), timeout);
                 } else {
-                    LOG.warn("Parallel processing timed out after " + timeout 
+ " millis for number " + i + ". Cannot aggregate");
+                    // log a WARN we timed out since it will not be aggregated 
and the Exchange will be lost
+                    LOG.warn("Parallel processing timed out after " + timeout 
+ " millis for number " + i + ". This task will be cancelled and will not be 
aggregated.");
                 }
                 timedOut = true;
             } else {

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java
 (from r982878, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java&r1=982878&r2=982890&rev=982890&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout2Test.java
 Fri Aug  6 06:55:20 2010
@@ -25,20 +25,20 @@ import org.apache.camel.processor.aggreg
 /**
  * @version $Revision$
  */
-public class MulticastParallelTimeoutTest extends ContextTestSupport {
+public class MulticastParallelTimeout2Test extends ContextTestSupport {
 
     public void testMulticastParallelTimeout() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        // A will timeout so we only get B and C
-        mock.expectedBodiesReceived("BC");
+        // B will timeout so we only get A and C
+        mock.expectedBodiesReceived("AC");
 
-        getMockEndpoint("mock:A").expectedMessageCount(0);
-        getMockEndpoint("mock:B").expectedMessageCount(1);
+        getMockEndpoint("mock:A").expectedMessageCount(1);
+        getMockEndpoint("mock:B").expectedMessageCount(0);
         getMockEndpoint("mock:C").expectedMessageCount(1);
 
         template.sendBody("direct:start", "Hello");
 
-        // wait at least longer than the delay in A so we can ensure its being 
cancelled
+        // wait at least longer than the delay in B so we can ensure its being 
cancelled
         // and wont continue routing
         Thread.sleep(4000);
 
@@ -68,9 +68,9 @@ public class MulticastParallelTimeoutTes
                     .end()
                     .to("mock:result");
 
-                
from("direct:a").delay(3000).to("mock:A").setBody(constant("A"));
+                from("direct:a").to("mock:A").setBody(constant("A"));
 
-                from("direct:b").to("mock:B").setBody(constant("B"));
+                
from("direct:b").delay(3000).to("mock:B").setBody(constant("B"));
 
                 
from("direct:c").delay(500).to("mock:C").setBody(constant("C"));
                 // END SNIPPET: e1

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java?rev=982890&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeout3Test.java
 Fri Aug  6 06:55:20 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision: 982655 $
+ */
+public class MulticastParallelTimeout3Test extends ContextTestSupport {
+
+    public void testMulticastParallelTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // C will timeout so we only get A and B
+        mock.expectedBodiesReceived("AB");
+
+        getMockEndpoint("mock:A").expectedMessageCount(1);
+        getMockEndpoint("mock:B").expectedMessageCount(1);
+        getMockEndpoint("mock:C").expectedMessageCount(0);
+
+        template.sendBody("direct:start", "Hello");
+
+        // wait at least longer than the delay in C so we can ensure its being 
cancelled
+        // and wont continue routing
+        Thread.sleep(3000);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .multicast(new AggregationStrategy() {
+                            public Exchange aggregate(Exchange oldExchange, 
Exchange newExchange) {
+                                if (oldExchange == null) {
+                                    return newExchange;
+                                }
+
+                                String body = 
oldExchange.getIn().getBody(String.class);
+                                oldExchange.getIn().setBody(body + 
newExchange.getIn().getBody(String.class));
+                                return oldExchange;
+                            }
+                        })
+                        .parallelProcessing().timeout(4000).to("direct:a", 
"direct:b", "direct:c")
+                    // use end to indicate end of multicast route
+                    .end()
+                    .to("mock:result");
+
+                
from("direct:a").delay(2000).to("mock:A").setBody(constant("A"));
+
+                
from("direct:b").delay(3000).to("mock:B").setBody(constant("B"));
+
+                
from("direct:c").delay(6000).to("mock:C").setBody(constant("C"));
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=982890&r1=982889&r2=982890&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Aug  6 
06:55:20 2010
@@ -20,13 +20,18 @@
 #
 log4j.rootLogger=INFO, file
 
-log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.camel.impl.converter=WARN
+log4j.logger.org.apache.camel.management=WARN
+log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
+
 #log4j.logger.org.apache.camel=DEBUG
 #log4j.logger.org.apache.camel.component=TRACE
 #log4j.logger.org.apache.camel.component.seda=TRACE
 #log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE
 #log4j.logger.org.apache.camel.component.mock=DEBUG
 #log4j.logger.org.apache.camel.component.file=TRACE
+#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
 #log4j.logger.org.apache.camel.processor.Pipeline=TRACE
 #log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE
 #log4j.logger.org.apache.camel.processor.RecipientList=TRACE
@@ -36,15 +41,9 @@ log4j.logger.org.apache.activemq.spring=
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 #log4j.logger.org.apache.camel.processor.Delayer=TRACE
 #log4j.logger.org.apache.camel.processor.Throttler=TRACE
-log4j.logger.org.apache.camel.impl.converter=WARN
-log4j.logger.org.apache.camel.management=WARN
-log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE
-#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
-#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
-#log4j.logger.org.apache.camel.processor.Pipeline=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender


Reply via email to