Repository: camel
Updated Branches:
  refs/heads/master c0f1f02c1 -> 16cc8a920


CAMEL-8525: Aggregate - Expose statistics about completed exchanges. Rename 
forceCompletion to force so its similar to the others.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94a6f9a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94a6f9a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94a6f9a0

Branch: refs/heads/master
Commit: 94a6f9a0c79a3981630f1e64cb0f3ae4281c50b4
Parents: c0f1f02
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Mar 21 10:49:52 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Mar 21 10:49:52 2015 +0100

----------------------------------------------------------------------
 .../mbean/ManagedAggregateProcessorMBean.java   |  37 +++++++
 .../mbean/ManagedAggregateProcessor.java        |  40 +++++++
 .../processor/aggregate/AggregateProcessor.java | 105 ++++++++++++++++++-
 .../aggregate/AggregateProcessorStatistics.java |  86 +++++++++++++++
 .../ManagedAggregateControllerTest.java         |  28 ++++-
 .../aggregator/AggregateControllerTest.java     |   4 +-
 .../AggregateForceCompletionHeaderTest.java     |   4 +-
 .../aggregator/AggregateProcessorTest.java      |   2 +-
 ...awtDBAggregateForceCompletionHeaderTest.java |   4 +-
 ...velDBAggregateForceCompletionHeaderTest.java |   4 +-
 .../JdbcAggregateForceCompletionHeaderTest.java |   4 +-
 11 files changed, 303 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index f4bed8d..a1810c7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -78,4 +78,41 @@ public interface ManagedAggregateProcessorMBean extends 
ManagedProcessorMBean {
     @ManagedOperation(description = "To force complete of all groups")
     int forceCompletionOfAllGroups();
 
+    @ManagedAttribute(description = "Total number of exchanges arrived into 
the aggregator")
+    long getTotalIn();
+
+    @ManagedAttribute(description = "Total number of exchanges completed and 
outgoing from the aggregator")
+    long getTotalCompleted();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion size trigger")
+    long getCompletedBySize();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion aggregation strategy trigger")
+    long getCompletedByStrategy();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion interval (timeout) trigger")
+    long getCompletedByInterval();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion timeout trigger")
+    long getCompletedByTimeout();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion predicate trigger")
+    long getCompletedByPredicate();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion batch consumer trigger")
+    long getCompletedByBatchConsumer();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by 
completion force trigger")
+    long getCompletedByForce();
+
+    @ManagedOperation(description = " Reset the statistics counters")
+    void resetStatistics();
+
+    @ManagedAttribute(description = "Sets whether statistics is enabled")
+    boolean isStatisticsEnabled();
+
+    @ManagedAttribute(description = "Sets whether statistics is enabled")
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index 3b09c52..7a7705d 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -149,4 +149,44 @@ public class ManagedAggregateProcessor extends 
ManagedProcessor implements Manag
             return 0;
         }
     }
+
+    public long getTotalIn() {
+        return processor.getStatistics().getTotalIn();
+    }
+
+    public long getTotalCompleted() {
+        return processor.getStatistics().getTotalCompleted();
+    }
+
+    public long getCompletedBySize() {
+        return processor.getStatistics().getCompletedBySize();
+    }
+
+    public long getCompletedByStrategy() {
+        return processor.getStatistics().getCompletedByStrategy();
+    }
+
+    public long getCompletedByInterval() {
+        return processor.getStatistics().getCompletedByInterval();
+    }
+
+    public long getCompletedByTimeout() {
+        return processor.getStatistics().getCompletedByTimeout();
+    }
+
+    public long getCompletedByPredicate() {
+        return processor.getStatistics().getCompletedByPredicate();
+    }
+
+    public long getCompletedByBatchConsumer() {
+        return processor.getStatistics().getCompletedByBatchConsumer();
+    }
+
+    public long getCompletedByForce() {
+        return processor.getStatistics().getCompletedByForce();
+    }
+
+    public void resetStatistics() {
+        processor.getStatistics().reset();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 8e34284..73bf0da 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -106,11 +107,82 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
     private final Set<String> inProgressCompleteExchanges = 
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
     private final Map<String, RedeliveryData> redeliveryState = new 
ConcurrentHashMap<String, RedeliveryData>();
 
+    private final AggregateProcessorStatistics statistics = new Statistics();
+    private final AtomicLong totalIn = new AtomicLong();
+    private final AtomicLong totalCompleted = new AtomicLong();
+    private final AtomicLong completedBySize = new AtomicLong();
+    private final AtomicLong completedByStrategy = new AtomicLong();
+    private final AtomicLong completedByInterval = new AtomicLong();
+    private final AtomicLong completedByTimeout = new AtomicLong();
+    private final AtomicLong completedByPredicate = new AtomicLong();
+    private final AtomicLong completedByBatchConsumer = new AtomicLong();
+    private final AtomicLong completedByForce = new AtomicLong();
+
     // keep booking about redelivery
     private class RedeliveryData {
         int redeliveryCounter;
     }
 
+    private class Statistics implements AggregateProcessorStatistics {
+
+        private boolean statisticsEnabled = true;
+
+        public long getTotalIn() {
+            return totalIn.get();
+        }
+
+        public long getTotalCompleted() {
+            return totalCompleted.get();
+        }
+
+        public long getCompletedBySize() {
+            return completedBySize.get();
+        }
+
+        public long getCompletedByStrategy() {
+            return completedByStrategy.get();
+        }
+
+        public long getCompletedByInterval() {
+            return completedByInterval.get();
+        }
+
+        public long getCompletedByTimeout() {
+            return completedByTimeout.get();
+        }
+
+        public long getCompletedByPredicate() {
+            return completedByPredicate.get();
+        }
+
+        public long getCompletedByBatchConsumer() {
+            return completedByBatchConsumer.get();
+        }
+
+        public long getCompletedByForce() {
+            return completedByForce.get();
+        }
+
+        public void reset() {
+            totalIn.set(0);
+            totalCompleted.set(0);
+            completedBySize.set(0);
+            completedByStrategy.set(0);
+            completedByTimeout.set(0);
+            completedByPredicate.set(0);
+            completedByBatchConsumer.set(0);
+            completedByForce.set(0);
+        }
+
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
     // options
     private boolean ignoreInvalidCorrelationKeys;
     private Integer closeCorrelationKeyOnCompletion;
@@ -187,6 +259,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
 
     protected void doProcess(Exchange exchange) throws Exception {
 
+        if (getStatistics().isStatisticsEnabled()) {
+            totalIn.incrementAndGet();
+        }
+
         //check for the special header to force completion of all groups (and 
ignore the exchange otherwise)
         boolean completeAllGroups = 
exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, 
boolean.class);
         if (completeAllGroups) {
@@ -541,6 +617,27 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             ((CompletionAwareAggregationStrategy) 
aggregationStrategy).onCompletion(exchange);
         }
 
+        if (getStatistics().isStatisticsEnabled()) {
+            totalCompleted.incrementAndGet();
+
+            String completedBy = 
exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class);
+            if ("interval".equals(completedBy)) {
+                completedByInterval.incrementAndGet();
+            } else if ("timeout".equals(completedBy)) {
+                completedByTimeout.incrementAndGet();
+            } else if ("force".equals(completedBy)) {
+                completedByForce.incrementAndGet();
+            } else if ("consumer".equals(completedBy)) {
+                completedByBatchConsumer.incrementAndGet();
+            } else if ("predicate".equals(completedBy)) {
+                completedByPredicate.incrementAndGet();
+            } else if ("size".equals(completedBy)) {
+                completedBySize.incrementAndGet();
+            } else if ("strategy".equals(completedBy)) {
+                completedByStrategy.incrementAndGet();
+            }
+        }
+
         // send this exchange
         executorService.submit(new Runnable() {
             public void run() {
@@ -610,6 +707,10 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         timeoutMap.put(key, exchange.getExchangeId(), timeout);
     }
 
+    public AggregateProcessorStatistics getStatistics() {
+        return statistics;
+    }
+
     public int getInProgressCompleteExchanges() {
         return inProgressCompleteExchanges.size();
     }
@@ -1217,7 +1318,7 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
                 total = 1;
                 LOG.trace("Force completion triggered for correlation key: 
{}", key);
                 // indicate it was completed by a force completion request
-                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"forceCompletion");
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"force");
                 Exchange answer = onCompletion(key, exchange, exchange, false);
                 if (answer != null) {
                     onSubmitCompletion(key, answer);
@@ -1259,7 +1360,7 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
                     if (exchange != null) {
                         LOG.trace("Force completion triggered for correlation 
key: {}", key);
                         // indicate it was completed by a force completion 
request
-                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"forceCompletion");
+                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"force");
                         Exchange answer = onCompletion(key, exchange, 
exchange, false);
                         if (answer != null) {
                             onSubmitCompletion(key, answer);

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
new file mode 100644
index 0000000..827a594
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * Various statistics of the aggregator
+ */
+public interface AggregateProcessorStatistics {
+
+    /**
+     * Total number of exchanges arrived into the aggregator
+     */
+    long getTotalIn();
+
+    /**
+     * Total number of exchanges completed and outgoing from the aggregator
+     */
+    long getTotalCompleted();
+
+    /**
+     * Total number of exchanged completed by completion size trigger
+     */
+    long getCompletedBySize();
+
+    /**
+     * Total number of exchanged completed by completion strategy trigger
+     */
+    long getCompletedByStrategy();
+
+    /**
+     * Total number of exchanged completed by completion interval trigger
+     */
+    long getCompletedByInterval();
+
+    /**
+     * Total number of exchanged completed by completion timeout trigger
+     */
+    long getCompletedByTimeout();
+
+    /**
+     * Total number of exchanged completed by completion predicate trigger
+     */
+    long getCompletedByPredicate();
+
+    /**
+     * Total number of exchanged completed by completion batch consumer trigger
+     */
+    long getCompletedByBatchConsumer();
+
+    /**
+     * Total number of exchanged completed by completion force trigger
+     */
+    long getCompletedByForce();
+
+    /**
+     * Reset the counters
+     */
+    void reset();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Sets whether statistics is enabled.
+     *
+     * @param statisticsEnabled <tt>true</tt> to enable
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
index 7860eca..2d783f6 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
@@ -54,7 +54,7 @@ public class ManagedAggregateControllerTest extends 
ManagementTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         Integer pending = (Integer) mbeanServer.invoke(on, 
"aggregationRepositoryGroups", null, null);
         assertEquals(2, pending.intValue());
@@ -67,6 +67,18 @@ public class ManagedAggregateControllerTest extends 
ManagementTestSupport {
         Long completed = (Long) mbeanServer.getAttribute(on, 
"ExchangesCompleted");
         assertEquals(4, completed.longValue());
 
+        completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+        assertEquals(2, completed.longValue());
+
+        Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+        assertEquals(4, in.longValue());
+
+        Long byForced = (Long) mbeanServer.getAttribute(on, 
"CompletedByForce");
+        assertEquals(2, byForced.longValue());
+
+        Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+        assertEquals(0, bySize.longValue());
+
         Integer size = (Integer) mbeanServer.getAttribute(on, 
"CompletionSize");
         assertEquals(10, size.longValue());
 
@@ -103,7 +115,7 @@ public class ManagedAggregateControllerTest extends 
ManagementTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(1);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         Integer pending = (Integer) mbeanServer.invoke(on, 
"aggregationRepositoryGroups", null, null);
         assertEquals(2, pending.intValue());
@@ -116,6 +128,18 @@ public class ManagedAggregateControllerTest extends 
ManagementTestSupport {
         Long completed = (Long) mbeanServer.getAttribute(on, 
"ExchangesCompleted");
         assertEquals(4, completed.longValue());
 
+        completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+        assertEquals(1, completed.longValue());
+
+        Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+        assertEquals(4, in.longValue());
+
+        Long byForced = (Long) mbeanServer.getAttribute(on, 
"CompletedByForce");
+        assertEquals(1, byForced.longValue());
+
+        Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+        assertEquals(0, bySize.longValue());
+
         Integer size = (Integer) mbeanServer.getAttribute(on, 
"CompletionSize");
         assertEquals(10, size.longValue());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index e9dee08..dbdad1f 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -51,7 +51,7 @@ public class AggregateControllerTest extends 
ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         int groups = getAggregateController().forceCompletionOfAllGroups();
         assertEquals(2, groups);
@@ -72,7 +72,7 @@ public class AggregateControllerTest extends 
ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(1);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         int groups = getAggregateController().forceCompletionOfGroup("1");
         assertEquals(1, groups);

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 308fb47..2971825 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -44,7 +44,7 @@ public class AggregateForceCompletionHeaderTest extends 
ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -66,7 +66,7 @@ public class AggregateForceCompletionHeaderTest extends 
ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send a message to trigger completion of all groups, message 
should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index 9ae8b61..daef63d 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -560,7 +560,7 @@ public class AggregateProcessorTest extends 
ContextTestSupport {
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceivedInAnyOrder("B+END", "A+END");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"forceCompletion");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"force");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
 
b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index ee646e2..2450856 100644
--- 
a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends 
CamelTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends 
CamelTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send a message to trigger completion of all groups, message 
should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index b88e4d4..699c7eb 100644
--- 
a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class LevelDBAggregateForceCompletionHeaderTest 
extends CamelTestSupport
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class LevelDBAggregateForceCompletionHeaderTest 
extends CamelTestSupport
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send a message to trigger completion of all groups, message 
should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 7a62cfb..7156a3c 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -42,7 +42,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends 
AbstractJdbcAggregat
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send the signal message to trigger completion of all groups, 
message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -64,7 +64,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends 
AbstractJdbcAggregat
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
 "test2test4", "test5");
-        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "forceCompletion");
+        
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
 "force");
 
         //now send a message to trigger completion of all groups, message 
should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

Reply via email to