Author: davsclaus
Date: Sun Nov 14 13:07:04 2010
New Revision: 1034992

URL: http://svn.apache.org/viewvc?rev=1034992&view=rev
Log:
CAMEL-3337: Aggregator uses a synchronized thread pool facade to process 
completed aggrated exchanges using the caller thread. This avoids any 
intermediate task queue, which could be filled due slow producer, and causing 
memory issues.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 Sun Nov 14 13:07:04 2010
@@ -255,7 +255,7 @@ public class DefaultExecutorServiceStrat
     }
 
     public ExecutorService newSynchronousThreadPool(Object source, String 
name) {
-        ExecutorService answer = 
ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name);
+        ExecutorService answer = 
ExecutorServiceHelper.newSynchronousThreadPool();
         onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 Sun Nov 14 13:07:04 2010
@@ -159,8 +159,8 @@ public class AggregateDefinition extends
                 // we are running in parallel so create a cached thread pool 
which grows/shrinks automatic
                 executorService = 
routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this,
 "Aggregator");
             } else {
-                // use a single threaded if we are not running in parallel
-                executorService = 
routeContext.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this,
 "Aggregator");
+                // use a synchronous thread pool if we are not running in 
parallel (will always use caller thread)
+                executorService = 
routeContext.getCamelContext().getExecutorServiceStrategy().newSynchronousThreadPool(this,
 "Aggregator");
             }
         }
         AggregateProcessor answer = new 
AggregateProcessor(routeContext.getCamelContext(), processor, correlation, 
strategy, executorService);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 Sun Nov 14 13:07:04 2010
@@ -171,17 +171,14 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new synchronous thread pool which executes the task in the 
caller thread (no task queue)
-     * <p/>
-     * Uses a {...@link java.util.concurrent.SynchronousQueue} queue as task 
queue.
+     * Creates a new synchronous executor service which always executes the 
task in the call thread
+     * (its just a thread pool facade)
      *
-     * @param pattern      pattern of the thread name
-     * @param name         ${name} in the pattern name
      * @return the created pool
+     * @see org.apache.camel.util.concurrent.SynchronousExecutorService
      */
-    public static ExecutorService newSynchronousThreadPool(final String 
pattern, final String name) {
-        return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60,
-                TimeUnit.SECONDS, 0, new 
ThreadPoolExecutor.CallerRunsPolicy(), true);
+    public static ExecutorService newSynchronousThreadPool() {
+        return new SynchronousExecutorService();
     }
 
     /**

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java?rev=1034992&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
 Sun Nov 14 13:07:04 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A synchronous {...@link java.util.concurrent.ExecutorService} which always 
invokes
+ * the task in the caller thread (just a thread pool facade).
+ * <p/>
+ * There is no task queue, and no thread pool. The task will thus always be 
executed
+ * by the caller thread in a fully synchronous method invocation.
+ * <p/>
+ * This implementation is very simple and does not support waiting for tasks 
to complete during shutdown.
+ *
+ * @version $Revision$
+ */
+public class SynchronousExecutorService extends AbstractExecutorService {
+
+    private volatile boolean shutdown;
+
+    public void shutdown() {
+        shutdown = true;
+    }
+
+    public List<Runnable> shutdownNow() {
+        // not implemented
+        return null;
+    }
+
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
+    public boolean isTerminated() {
+        return shutdown;
+    }
+
+    public boolean awaitTermination(long time, TimeUnit unit) throws 
InterruptedException {
+        // not implemented
+        return true;
+    }
+
+    public void execute(Runnable runnable) {
+        // run the task synchronously
+        runnable.run();
+    }
+
+}

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
 Sun Nov 14 13:07:04 2010
@@ -37,7 +37,7 @@ public class AggregateParallelProcessing
                 from("direct:start")
                     .aggregate(header("id"), new BodyInAggregatingStrategy())
                         
.eagerCheckCompletion().completionPredicate(body().isEqualTo("END")).parallelProcessing()
-                        .to("mock:result");
+                        .to("log:result", "mock:result");
             }
         });
         context.start();
@@ -62,7 +62,7 @@ public class AggregateParallelProcessing
                 from("direct:start")
                     .aggregate(header("id"), new BodyInAggregatingStrategy())
                         
.eagerCheckCompletion().completionPredicate(body().isEqualTo("END"))
-                        .to("mock:result");
+                        .to("log:result", "mock:result");
             }
         });
         context.start();

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
 Sun Nov 14 13:07:04 2010
@@ -66,7 +66,7 @@ public class ExecutorServiceHelperTest e
     }
 
     public void testNewSynchronousThreadPool() {
-        ExecutorService pool = 
ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo");
+        ExecutorService pool = 
ExecutorServiceHelper.newSynchronousThreadPool();
         assertNotNull(pool);
     }
 

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java?rev=1034992&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
 Sun Nov 14 13:07:04 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class SynchronousExecutorServiceTest extends TestCase {
+
+    private static boolean invoked;
+    private static String name1;
+    private static String name2;
+
+    public void testSynchronousExecutorService() throws Exception {
+        name1 = Thread.currentThread().getName();
+
+        ExecutorService service = new SynchronousExecutorService();
+        service.execute(new Runnable() {
+            public void run() {
+                invoked = true;
+                name2 = Thread.currentThread().getName();
+            }
+        });
+
+        assertTrue("Should have been invoked", invoked);
+        assertEquals("Should use same thread", name1, name2);
+    }
+
+    public void testSynchronousExecutorServiceShutdown() throws Exception {
+        ExecutorService service = new SynchronousExecutorService();
+        service.execute(new Runnable() {
+            public void run() {
+                invoked = true;
+            }
+        });
+        service.shutdown();
+
+        assertTrue(service.isShutdown());
+        assertTrue(service.isTerminated());
+    }
+}

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Sun Nov 14 
13:07:04 2010
@@ -43,6 +43,7 @@ log4j.logger.org.apache.camel.impl.Defau
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 #log4j.logger.org.apache.camel.processor.Delayer=TRACE
 #log4j.logger.org.apache.camel.processor.Throttler=TRACE
+#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE


Reply via email to