Repository: camel Updated Branches: refs/heads/master c0f1f02c1 -> 16cc8a920
CAMEL-8525: Aggregate - Expose statistics about completed exchanges. Rename forceCompletion to force so its similar to the others. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94a6f9a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94a6f9a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94a6f9a0 Branch: refs/heads/master Commit: 94a6f9a0c79a3981630f1e64cb0f3ae4281c50b4 Parents: c0f1f02 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Mar 21 10:49:52 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Mar 21 10:49:52 2015 +0100 ---------------------------------------------------------------------- .../mbean/ManagedAggregateProcessorMBean.java | 37 +++++++ .../mbean/ManagedAggregateProcessor.java | 40 +++++++ .../processor/aggregate/AggregateProcessor.java | 105 ++++++++++++++++++- .../aggregate/AggregateProcessorStatistics.java | 86 +++++++++++++++ .../ManagedAggregateControllerTest.java | 28 ++++- .../aggregator/AggregateControllerTest.java | 4 +- .../AggregateForceCompletionHeaderTest.java | 4 +- .../aggregator/AggregateProcessorTest.java | 2 +- ...awtDBAggregateForceCompletionHeaderTest.java | 4 +- ...velDBAggregateForceCompletionHeaderTest.java | 4 +- .../JdbcAggregateForceCompletionHeaderTest.java | 4 +- 11 files changed, 303 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java index f4bed8d..a1810c7 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java @@ -78,4 +78,41 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedOperation(description = "To force complete of all groups") int forceCompletionOfAllGroups(); + @ManagedAttribute(description = "Total number of exchanges arrived into the aggregator") + long getTotalIn(); + + @ManagedAttribute(description = "Total number of exchanges completed and outgoing from the aggregator") + long getTotalCompleted(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion size trigger") + long getCompletedBySize(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion aggregation strategy trigger") + long getCompletedByStrategy(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion interval (timeout) trigger") + long getCompletedByInterval(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion timeout trigger") + long getCompletedByTimeout(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion predicate trigger") + long getCompletedByPredicate(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion batch consumer trigger") + long getCompletedByBatchConsumer(); + + @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger") + long getCompletedByForce(); + + @ManagedOperation(description = " Reset the statistics counters") + void resetStatistics(); + + @ManagedAttribute(description = "Sets whether statistics is enabled") + boolean isStatisticsEnabled(); + + @ManagedAttribute(description = "Sets whether statistics is enabled") + void setStatisticsEnabled(boolean statisticsEnabled); + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java index 3b09c52..7a7705d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java @@ -149,4 +149,44 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag return 0; } } + + public long getTotalIn() { + return processor.getStatistics().getTotalIn(); + } + + public long getTotalCompleted() { + return processor.getStatistics().getTotalCompleted(); + } + + public long getCompletedBySize() { + return processor.getStatistics().getCompletedBySize(); + } + + public long getCompletedByStrategy() { + return processor.getStatistics().getCompletedByStrategy(); + } + + public long getCompletedByInterval() { + return processor.getStatistics().getCompletedByInterval(); + } + + public long getCompletedByTimeout() { + return processor.getStatistics().getCompletedByTimeout(); + } + + public long getCompletedByPredicate() { + return processor.getStatistics().getCompletedByPredicate(); + } + + public long getCompletedByBatchConsumer() { + return processor.getStatistics().getCompletedByBatchConsumer(); + } + + public long getCompletedByForce() { + return processor.getStatistics().getCompletedByForce(); + } + + public void resetStatistics() { + processor.getStatistics().reset(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 8e34284..73bf0da 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -106,11 +107,82 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); + private final AggregateProcessorStatistics statistics = new Statistics(); + private final AtomicLong totalIn = new AtomicLong(); + private final AtomicLong totalCompleted = new AtomicLong(); + private final AtomicLong completedBySize = new AtomicLong(); + private final AtomicLong completedByStrategy = new AtomicLong(); + private final AtomicLong completedByInterval = new AtomicLong(); + private final AtomicLong completedByTimeout = new AtomicLong(); + private final AtomicLong completedByPredicate = new AtomicLong(); + private final AtomicLong completedByBatchConsumer = new AtomicLong(); + private final AtomicLong completedByForce = new AtomicLong(); + // keep booking about redelivery private class RedeliveryData { int redeliveryCounter; } + private class Statistics implements AggregateProcessorStatistics { + + private boolean statisticsEnabled = true; + + public long getTotalIn() { + return totalIn.get(); + } + + public long getTotalCompleted() { + return totalCompleted.get(); + } + + public long getCompletedBySize() { + return completedBySize.get(); + } + + public long getCompletedByStrategy() { + return completedByStrategy.get(); + } + + public long getCompletedByInterval() { + return completedByInterval.get(); + } + + public long getCompletedByTimeout() { + return completedByTimeout.get(); + } + + public long getCompletedByPredicate() { + return completedByPredicate.get(); + } + + public long getCompletedByBatchConsumer() { + return completedByBatchConsumer.get(); + } + + public long getCompletedByForce() { + return completedByForce.get(); + } + + public void reset() { + totalIn.set(0); + totalCompleted.set(0); + completedBySize.set(0); + completedByStrategy.set(0); + completedByTimeout.set(0); + completedByPredicate.set(0); + completedByBatchConsumer.set(0); + completedByForce.set(0); + } + + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + // options private boolean ignoreInvalidCorrelationKeys; private Integer closeCorrelationKeyOnCompletion; @@ -187,6 +259,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor protected void doProcess(Exchange exchange) throws Exception { + if (getStatistics().isStatisticsEnabled()) { + totalIn.incrementAndGet(); + } + //check for the special header to force completion of all groups (and ignore the exchange otherwise) boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); if (completeAllGroups) { @@ -541,6 +617,27 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange); } + if (getStatistics().isStatisticsEnabled()) { + totalCompleted.incrementAndGet(); + + String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class); + if ("interval".equals(completedBy)) { + completedByInterval.incrementAndGet(); + } else if ("timeout".equals(completedBy)) { + completedByTimeout.incrementAndGet(); + } else if ("force".equals(completedBy)) { + completedByForce.incrementAndGet(); + } else if ("consumer".equals(completedBy)) { + completedByBatchConsumer.incrementAndGet(); + } else if ("predicate".equals(completedBy)) { + completedByPredicate.incrementAndGet(); + } else if ("size".equals(completedBy)) { + completedBySize.incrementAndGet(); + } else if ("strategy".equals(completedBy)) { + completedByStrategy.incrementAndGet(); + } + } + // send this exchange executorService.submit(new Runnable() { public void run() { @@ -610,6 +707,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor timeoutMap.put(key, exchange.getExchangeId(), timeout); } + public AggregateProcessorStatistics getStatistics() { + return statistics; + } + public int getInProgressCompleteExchanges() { return inProgressCompleteExchanges.size(); } @@ -1217,7 +1318,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor total = 1; LOG.trace("Force completion triggered for correlation key: {}", key); // indicate it was completed by a force completion request - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); Exchange answer = onCompletion(key, exchange, exchange, false); if (answer != null) { onSubmitCompletion(key, answer); @@ -1259,7 +1360,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor if (exchange != null) { LOG.trace("Force completion triggered for correlation key: {}", key); // indicate it was completed by a force completion request - exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); Exchange answer = onCompletion(key, exchange, exchange, false); if (answer != null) { onSubmitCompletion(key, answer); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java new file mode 100644 index 0000000..827a594 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +/** + * Various statistics of the aggregator + */ +public interface AggregateProcessorStatistics { + + /** + * Total number of exchanges arrived into the aggregator + */ + long getTotalIn(); + + /** + * Total number of exchanges completed and outgoing from the aggregator + */ + long getTotalCompleted(); + + /** + * Total number of exchanged completed by completion size trigger + */ + long getCompletedBySize(); + + /** + * Total number of exchanged completed by completion strategy trigger + */ + long getCompletedByStrategy(); + + /** + * Total number of exchanged completed by completion interval trigger + */ + long getCompletedByInterval(); + + /** + * Total number of exchanged completed by completion timeout trigger + */ + long getCompletedByTimeout(); + + /** + * Total number of exchanged completed by completion predicate trigger + */ + long getCompletedByPredicate(); + + /** + * Total number of exchanged completed by completion batch consumer trigger + */ + long getCompletedByBatchConsumer(); + + /** + * Total number of exchanged completed by completion force trigger + */ + long getCompletedByForce(); + + /** + * Reset the counters + */ + void reset(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Sets whether statistics is enabled. + * + * @param statisticsEnabled <tt>true</tt> to enable + */ + void setStatisticsEnabled(boolean statisticsEnabled); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java index 7860eca..2d783f6 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java @@ -54,7 +54,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null); assertEquals(2, pending.intValue()); @@ -67,6 +67,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport { Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted"); assertEquals(4, completed.longValue()); + completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted"); + assertEquals(2, completed.longValue()); + + Long in = (Long) mbeanServer.getAttribute(on, "TotalIn"); + assertEquals(4, in.longValue()); + + Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce"); + assertEquals(2, byForced.longValue()); + + Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize"); + assertEquals(0, bySize.longValue()); + Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize"); assertEquals(10, size.longValue()); @@ -103,7 +115,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(1); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null); assertEquals(2, pending.intValue()); @@ -116,6 +128,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport { Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted"); assertEquals(4, completed.longValue()); + completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted"); + assertEquals(1, completed.longValue()); + + Long in = (Long) mbeanServer.getAttribute(on, "TotalIn"); + assertEquals(4, in.longValue()); + + Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce"); + assertEquals(1, byForced.longValue()); + + Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize"); + assertEquals(0, bySize.longValue()); + Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize"); assertEquals(10, size.longValue()); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java index e9dee08..dbdad1f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java @@ -51,7 +51,7 @@ public class AggregateControllerTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); int groups = getAggregateController().forceCompletionOfAllGroups(); assertEquals(2, groups); @@ -72,7 +72,7 @@ public class AggregateControllerTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(1); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); int groups = getAggregateController().forceCompletionOfGroup("1"); assertEquals(1, groups); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java index 308fb47..2971825 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java @@ -44,7 +44,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); @@ -66,7 +66,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send a message to trigger completion of all groups, message should be aggregated Map<String, Object> headers = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java index 9ae8b61..daef63d 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java @@ -560,7 +560,7 @@ public class AggregateProcessorTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceivedInAnyOrder("B+END", "A+END"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java index ee646e2..2450856 100644 --- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java +++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java @@ -50,7 +50,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); @@ -72,7 +72,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport { getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send a message to trigger completion of all groups, message should be aggregated Map<String, Object> headers = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java ---------------------------------------------------------------------- diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java index b88e4d4..699c7eb 100644 --- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java +++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java @@ -50,7 +50,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); @@ -72,7 +72,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send a message to trigger completion of all groups, message should be aggregated Map<String, Object> headers = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java index 7a62cfb..7156a3c 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java @@ -42,7 +42,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat getMockEndpoint("mock:aggregated").expectedMessageCount(2); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send the signal message to trigger completion of all groups, message should NOT be aggregated template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true); @@ -64,7 +64,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5"); - getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); + getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force"); //now send a message to trigger completion of all groups, message should be aggregated Map<String, Object> headers = new HashMap<String, Object>();