CAMEL-7528: Aggregate EIP now allows to wait and complete all pending exchanges 
when stopping. For example when using a memory based repository to drain the 
repo before stopping.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e81a4ca6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e81a4ca6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e81a4ca6

Branch: refs/heads/master
Commit: e81a4ca686b1db487bc94abc0e287ea8012a0e1a
Parents: 1a594a8
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Aug 12 16:33:59 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 12 16:41:24 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedAggregateProcessorMBean.java   |  3 +
 .../camel/impl/DefaultShutdownStrategy.java     | 32 ++++++++---
 .../mbean/ManagedAggregateProcessor.java        |  4 ++
 .../apache/camel/model/AggregateDefinition.java | 29 +++++++++-
 .../processor/aggregate/AggregateProcessor.java | 30 +++++++++-
 .../AggregateCompleteAllOnStopTest.java         | 58 ++++++++++++++++++++
 6 files changed, 147 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index 07c1d21..4944422 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -78,6 +78,9 @@ public interface ManagedAggregateProcessorMBean extends 
ManagedProcessorMBean {
     @ManagedAttribute(description = "Indicates to complete all current 
aggregated exchanges when the context is stopped")
     boolean isForceCompletionOnStop();
 
+    @ManagedAttribute(description = "Indicates to wait to complete all current 
and partial (pending) aggregated exchanges when the context is stopped")
+    boolean isCompleteAllOnStop();
+
     @ManagedAttribute(description = "Number of completed exchanges which are 
currently in-flight")
     int getInProgressCompleteExchanges();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
index eed2235..4d1395e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
@@ -595,13 +595,7 @@ public class DefaultShutdownStrategy extends 
ServiceSupport implements ShutdownS
 
                 for (RouteStartupOrder order : routes) {
                     int inflight = 
context.getInflightRepository().size(order.getRoute().getId());
-                    for (Consumer consumer : order.getInputs()) {
-                        // include any additional pending exchanges on some 
consumers which may have internal
-                        // memory queues such as seda
-                        if (consumer instanceof ShutdownAware) {
-                            inflight += ((ShutdownAware) 
consumer).getPendingExchangesSize();
-                        }
-                    }
+                    inflight += getPendingInflightExchanges(order);
                     if (inflight > 0) {
                         String routeId = order.getRoute().getId();
                         routeInflight.put(routeId, inflight);
@@ -679,6 +673,30 @@ public class DefaultShutdownStrategy extends 
ServiceSupport implements ShutdownS
     }
 
     /**
+     * Calculates the total number of inflight exchanges for the given route
+     *
+     * @param order the route
+     * @return number of inflight exchanges
+     */
+    protected static int getPendingInflightExchanges(RouteStartupOrder order) {
+        int inflight = 0;
+
+        // the consumer is the 1st service so we always get the consumer
+        // the child services are EIPs in the routes which may also have 
pending
+        // inflight exchanges (such as the aggregator)
+        for (Service service : order.getServices()) {
+            Set<Service> children = ServiceHelper.getChildServices(service);
+            for (Service child : children) {
+                if (child instanceof ShutdownAware) {
+                    inflight += ((ShutdownAware) 
child).getPendingExchangesSize();
+                }
+            }
+        }
+
+        return inflight;
+    }
+
+    /**
      * Logs information about the inflight exchanges
      *
      * @param infoLevel <tt>true</tt> to log at INFO level, <tt>false</tt> to 
log at DEBUG level

http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index a00c63e..e4c167e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -159,6 +159,10 @@ public class ManagedAggregateProcessor extends 
ManagedProcessor implements Manag
         return processor.isCompletionFromBatchConsumer();
     }
 
+    public boolean isCompleteAllOnStop() {
+        return processor.isCompleteAllOnStop();
+    }
+
     public int getInProgressCompleteExchanges() {
         return processor.getInProgressCompleteExchanges();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 4b89a55..5573842 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -44,7 +44,6 @@ import 
org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.concurrent.SynchronousExecutorService;
 
 /**
@@ -115,6 +114,8 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     private Boolean discardOnCompletionTimeout;
     @XmlAttribute
     private Boolean forceCompletionOnStop;
+    @XmlAttribute
+    private Boolean completeAllOnStop;
     @XmlTransient
     private AggregateController aggregateController;
     @XmlAttribute
@@ -264,6 +265,9 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         if (getForceCompletionOnStop() != null) {
             answer.setForceCompletionOnStop(getForceCompletionOnStop());
         }
+        if (getCompleteAllOnStop() != null) {
+            answer.setCompleteAllOnStop(getCompleteAllOnStop());
+        }
         if (optimisticLockRetryPolicy == null) {
             if (getOptimisticLockRetryPolicyDefinition() != null) {
                 
answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
@@ -623,6 +627,14 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
 
+    public Boolean getCompleteAllOnStop() {
+        return completeAllOnStop;
+    }
+
+    public void setCompleteAllOnStop(Boolean completeAllOnStop) {
+        this.completeAllOnStop = completeAllOnStop;
+    }
+
     public AggregateController getAggregateController() {
         return aggregateController;
     }
@@ -867,6 +879,21 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Indicates to wait to complete all current and partial (pending) 
aggregated exchanges when the context is stopped.
+     * <p/>
+     * This also means that we will wait for all pending exchanges which are 
stored in the aggregation repository
+     * to complete so the repository is empty before we can stop.
+     * <p/>
+     * You may want to enable this when using the memory based aggregation 
repository that is memory based only,
+     * and do not store data on disk. When this option is enabled, then the 
aggregator is waiting to complete
+     * all those exchanges before its stopped, when stopping CamelContext or 
the route using it.
+     */
+    public AggregateDefinition completeAllOnStop() {
+        setCompleteAllOnStop(true);
+        return this;
+    }
+
+    /**
      * When aggregated are completed they are being send out of the aggregator.
      * This option indicates whether or not Camel should use a thread pool 
with multiple threads for concurrency.
      * If no custom thread pool has been specified then Camel creates a 
default pool with 10 concurrent threads.

http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 4e0dbca..4e3403c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -44,6 +44,7 @@ import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.TimeoutMap;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.AggregationRepository;
@@ -51,6 +52,7 @@ import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultTimeoutMap;
@@ -81,7 +83,7 @@ import org.slf4j.LoggerFactory;
  * and older prices are discarded). Another idea is to combine line item 
messages
  * together into a single invoice message.
  */
-public class AggregateProcessor extends ServiceSupport implements 
AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, IdAware {
+public class AggregateProcessor extends ServiceSupport implements 
AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, 
ShutdownAware, IdAware {
 
     public static final String AGGREGATE_TIMEOUT_CHECKER = 
"AggregateTimeoutChecker";
 
@@ -204,6 +206,7 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
     private boolean discardOnCompletionTimeout;
     private boolean forceCompletionOnStop;
+    private boolean completeAllOnStop;
 
     private ProducerTemplate deadLetterProducerTemplate;
 
@@ -891,6 +894,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
+    public boolean isCompleteAllOnStop() {
+        return completeAllOnStop;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }
@@ -935,6 +942,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
 
+    public void setCompleteAllOnStop(boolean completeAllOnStop) {
+        this.completeAllOnStop = completeAllOnStop;
+    }
+
     public void setTimeoutCheckerExecutorService(ScheduledExecutorService 
timeoutCheckerExecutorService) {
         this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
     }
@@ -1380,6 +1391,23 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         }
     }
 
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // not in use
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        if (completeAllOnStop) {
+            // we want to regard all pending exchanges in the repo as inflight
+            Set<String> keys = getAggregationRepository().getKeys();
+            return keys != null ? keys.size() : 0;
+        } else {
+            return 0;
+        }
+    }
+
     private void doForceCompletionOnStop() {
         int expected = forceCompletionOfAllGroups();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e81a4ca6/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
new file mode 100644
index 0000000..e2d1405
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+
+/**
+ * @version
+ */
+public class AggregateCompleteAllOnStopTest extends ContextTestSupport {
+
+    public void testCompleteAllOnStop() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("A+B", "C");
+
+        // we only send 3, but we get 2 exchanges completed when stopping
+        // as we tell it to complete all on stop
+        template.sendBodyAndHeader("seda:start", "A", "id", "foo");
+        template.sendBodyAndHeader("seda:start", "B", "id", "foo");
+        template.sendBodyAndHeader("seda:start", "C", "id", "foo");
+
+        context.stopRoute("foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").routeId("foo")
+                        .aggregate(header("id"), new 
BodyInAggregatingStrategy()).aggregationRepository(new 
MemoryAggregationRepository())
+                        
.completionSize(2).completionTimeout(2000).completeAllOnStop()
+                        .to("mock:aggregated");
+            }
+        };
+    }
+
+}

Reply via email to