CAMEL-7528: Aggregate EIP now allows to wait and complete all pending exchanges when stopping. For example when using a memory based repository to drain the repo before stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e81a4ca6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e81a4ca6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e81a4ca6 Branch: refs/heads/master Commit: e81a4ca686b1db487bc94abc0e287ea8012a0e1a Parents: 1a594a8 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 12 16:33:59 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 12 16:41:24 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedAggregateProcessorMBean.java | 3 + .../camel/impl/DefaultShutdownStrategy.java | 32 ++++++++--- .../mbean/ManagedAggregateProcessor.java | 4 ++ .../apache/camel/model/AggregateDefinition.java | 29 +++++++++- .../processor/aggregate/AggregateProcessor.java | 30 +++++++++- .../AggregateCompleteAllOnStopTest.java | 58 ++++++++++++++++++++ 6 files changed, 147 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java index 07c1d21..4944422 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java @@ -78,6 +78,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Indicates to complete all current aggregated exchanges when the context is stopped") boolean isForceCompletionOnStop(); + @ManagedAttribute(description = "Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped") + boolean isCompleteAllOnStop(); + @ManagedAttribute(description = "Number of completed exchanges which are currently in-flight") int getInProgressCompleteExchanges(); http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java index eed2235..4d1395e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java @@ -595,13 +595,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS for (RouteStartupOrder order : routes) { int inflight = context.getInflightRepository().size(order.getRoute().getId()); - for (Consumer consumer : order.getInputs()) { - // include any additional pending exchanges on some consumers which may have internal - // memory queues such as seda - if (consumer instanceof ShutdownAware) { - inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); - } - } + inflight += getPendingInflightExchanges(order); if (inflight > 0) { String routeId = order.getRoute().getId(); routeInflight.put(routeId, inflight); @@ -679,6 +673,30 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS } /** + * Calculates the total number of inflight exchanges for the given route + * + * @param order the route + * @return number of inflight exchanges + */ + protected static int getPendingInflightExchanges(RouteStartupOrder order) { + int inflight = 0; + + // the consumer is the 1st service so we always get the consumer + // the child services are EIPs in the routes which may also have pending + // inflight exchanges (such as the aggregator) + for (Service service : order.getServices()) { + Set<Service> children = ServiceHelper.getChildServices(service); + for (Service child : children) { + if (child instanceof ShutdownAware) { + inflight += ((ShutdownAware) child).getPendingExchangesSize(); + } + } + } + + return inflight; + } + + /** * Logs information about the inflight exchanges * * @param infoLevel <tt>true</tt> to log at INFO level, <tt>false</tt> to log at DEBUG level http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java index a00c63e..e4c167e 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java @@ -159,6 +159,10 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag return processor.isCompletionFromBatchConsumer(); } + public boolean isCompleteAllOnStop() { + return processor.isCompleteAllOnStop(); + } + public int getInProgressCompleteExchanges() { return processor.getInProgressCompleteExchanges(); } http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 4b89a55..5573842 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -44,7 +44,6 @@ import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.concurrent.SynchronousExecutorService; /** @@ -115,6 +114,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition private Boolean discardOnCompletionTimeout; @XmlAttribute private Boolean forceCompletionOnStop; + @XmlAttribute + private Boolean completeAllOnStop; @XmlTransient private AggregateController aggregateController; @XmlAttribute @@ -264,6 +265,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition if (getForceCompletionOnStop() != null) { answer.setForceCompletionOnStop(getForceCompletionOnStop()); } + if (getCompleteAllOnStop() != null) { + answer.setCompleteAllOnStop(getCompleteAllOnStop()); + } if (optimisticLockRetryPolicy == null) { if (getOptimisticLockRetryPolicyDefinition() != null) { answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); @@ -623,6 +627,14 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition this.forceCompletionOnStop = forceCompletionOnStop; } + public Boolean getCompleteAllOnStop() { + return completeAllOnStop; + } + + public void setCompleteAllOnStop(Boolean completeAllOnStop) { + this.completeAllOnStop = completeAllOnStop; + } + public AggregateController getAggregateController() { return aggregateController; } @@ -867,6 +879,21 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. + * <p/> + * This also means that we will wait for all pending exchanges which are stored in the aggregation repository + * to complete so the repository is empty before we can stop. + * <p/> + * You may want to enable this when using the memory based aggregation repository that is memory based only, + * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete + * all those exchanges before its stopped, when stopping CamelContext or the route using it. + */ + public AggregateDefinition completeAllOnStop() { + setCompleteAllOnStop(true); + return this; + } + + /** * When aggregated are completed they are being send out of the aggregator. * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 4e0dbca..4e3403c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -44,6 +44,7 @@ import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; +import org.apache.camel.ShutdownRunningTask; import org.apache.camel.TimeoutMap; import org.apache.camel.Traceable; import org.apache.camel.spi.AggregationRepository; @@ -51,6 +52,7 @@ import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.OptimisticLockingAggregationRepository; import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.ShutdownPrepared; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultTimeoutMap; @@ -81,7 +83,7 @@ import org.slf4j.LoggerFactory; * and older prices are discarded). Another idea is to combine line item messages * together into a single invoice message. */ -public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, IdAware { +public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware { public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; @@ -204,6 +206,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor private AtomicInteger batchConsumerCounter = new AtomicInteger(); private boolean discardOnCompletionTimeout; private boolean forceCompletionOnStop; + private boolean completeAllOnStop; private ProducerTemplate deadLetterProducerTemplate; @@ -891,6 +894,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor this.completionFromBatchConsumer = completionFromBatchConsumer; } + public boolean isCompleteAllOnStop() { + return completeAllOnStop; + } + public ExceptionHandler getExceptionHandler() { return exceptionHandler; } @@ -935,6 +942,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor this.forceCompletionOnStop = forceCompletionOnStop; } + public void setCompleteAllOnStop(boolean completeAllOnStop) { + this.completeAllOnStop = completeAllOnStop; + } + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; } @@ -1380,6 +1391,23 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } } + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + // not in use + return true; + } + + @Override + public int getPendingExchangesSize() { + if (completeAllOnStop) { + // we want to regard all pending exchanges in the repo as inflight + Set<String> keys = getAggregationRepository().getKeys(); + return keys != null ? keys.size() : 0; + } else { + return 0; + } + } + private void doForceCompletionOnStop() { int expected = forceCompletionOfAllGroups(); http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java new file mode 100644 index 0000000..e2d1405 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.aggregate.MemoryAggregationRepository; + +/** + * @version + */ +public class AggregateCompleteAllOnStopTest extends ContextTestSupport { + + public void testCompleteAllOnStop() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B", "C"); + + // we only send 3, but we get 2 exchanges completed when stopping + // as we tell it to complete all on stop + template.sendBodyAndHeader("seda:start", "A", "id", "foo"); + template.sendBodyAndHeader("seda:start", "B", "id", "foo"); + template.sendBodyAndHeader("seda:start", "C", "id", "foo"); + + context.stopRoute("foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start").routeId("foo") + .aggregate(header("id"), new BodyInAggregatingStrategy()).aggregationRepository(new MemoryAggregationRepository()) + .completionSize(2).completionTimeout(2000).completeAllOnStop() + .to("mock:aggregated"); + } + }; + } + +}