Author: davsclaus Date: Sun Jan 9 10:22:45 2011 New Revision: 1056903 URL: http://svn.apache.org/viewvc?rev=1056903&view=rev Log: CAMEL-3497: Fixed subtle issue with aggregation task blocking.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056903&r1=1056902&r2=1056903&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sun Jan 9 10:22:45 2011 @@ -145,7 +145,6 @@ public class MulticastProcessor extends private final boolean streaming; private final boolean stopOnException; private final ExecutorService executorService; - private ExecutorService aggregationExecutorService; private final long timeout; private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); @@ -234,6 +233,8 @@ public class MulticastProcessor extends protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, final boolean streaming, final AsyncCallback callback) throws Exception { + ObjectHelper.notNull(executorService, "ExecutorService", this); + final CompletionService<Exchange> completion; if (streaming) { // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) @@ -259,8 +260,8 @@ public class MulticastProcessor extends AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, original, total, completion, running, aggregationOnTheFlyDone, allTasksSubmitted, executionException); - // and start the task using the aggregation execution service - aggregationExecutorService.submit(task); + // and start the aggregation task so we can aggregate on-the-fly + executorService.submit(task); } LOG.trace("Starting to submit parallel tasks"); @@ -421,11 +422,15 @@ public class MulticastProcessor extends } future = completion.poll(left, TimeUnit.MILLISECONDS); } else { - // take will wait until the task is complete if (LOG.isTraceEnabled()) { LOG.trace("Polling completion task #" + aggregated); } - future = completion.take(); + // we must not block so poll every second + future = completion.poll(1, TimeUnit.SECONDS); + if (future == null) { + // and continue loop which will recheck if we are done + continue; + } } if (future == null && timedOut) { @@ -866,9 +871,6 @@ public class MulticastProcessor extends if (timeout > 0 && !isParallelProcessing()) { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); } - if (isParallelProcessing()) { - aggregationExecutorService = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "AggregationTask"); - } ServiceHelper.startServices(processors); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1056903&r1=1056902&r2=1056903&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sun Jan 9 10:22:45 2011 @@ -111,6 +111,15 @@ public class RecipientList extends Servi isParallelProcessing(), getExecutorService(), isStreaming(), isStopOnException(), getTimeout()); rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); + // start the service + try { + ServiceHelper.startService(rlp); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } + // now let the multicast process the exchange return AsyncProcessorHelper.process(rlp, exchange, callback); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1056903&r1=1056902&r2=1056903&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Sun Jan 9 10:22:45 2011 @@ -59,6 +59,9 @@ public final class ServiceHelper { * Starts all of the given services */ public static void startServices(Object... services) throws Exception { + if (services == null) { + return; + } for (Object value : services) { startService(value); } @@ -68,6 +71,9 @@ public final class ServiceHelper { * Starts all of the given services */ public static void startServices(Collection<?> services) throws Exception { + if (services == null) { + return; + } for (Object value : services) { if (value instanceof Service) { Service service = (Service)value; @@ -83,6 +89,9 @@ public final class ServiceHelper { * Stops all of the given services, throwing the first exception caught */ public static void stopServices(Object... services) throws Exception { + if (services == null) { + return; + } List<Object> list = Arrays.asList(services); stopServices(list); } @@ -106,6 +115,9 @@ public final class ServiceHelper { * Stops all of the given services, throwing the first exception caught */ public static void stopServices(Collection<?> services) throws Exception { + if (services == null) { + return; + } Exception firstException = null; for (Object value : services) { if (value instanceof Service) { @@ -134,6 +146,9 @@ public final class ServiceHelper { * Stops and shutdowns all of the given services, throwing the first exception caught */ public static void stopAndShutdownServices(Object... services) throws Exception { + if (services == null) { + return; + } List<Object> list = Arrays.asList(services); stopAndShutdownServices(list); } @@ -163,6 +178,9 @@ public final class ServiceHelper { * Stops and shutdowns all of the given services, throwing the first exception caught */ public static void stopAndShutdownServices(Collection<?> services) throws Exception { + if (services == null) { + return; + } Exception firstException = null; for (Object value : services) { @@ -194,6 +212,9 @@ public final class ServiceHelper { } public static void resumeServices(Collection<?> services) throws Exception { + if (services == null) { + return; + } Exception firstException = null; for (Object value : services) { if (value instanceof Service) { @@ -254,6 +275,9 @@ public final class ServiceHelper { } public static void suspendServices(Collection<?> services) throws Exception { + if (services == null) { + return; + } Exception firstException = null; for (Object value : services) { if (value instanceof Service) {