Author: davsclaus Date: Fri Feb 26 15:12:49 2010 New Revision: 916707 URL: http://svn.apache.org/viewvc?rev=916707&view=rev Log: CAMEL-2500: Added property on aggregated exchange to know how it completed.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=916707&r1=916706&r2=916707&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Feb 26 15:12:49 2010 @@ -36,6 +36,7 @@ @Deprecated String AGGREGATED_INDEX = "CamelAggregatedIndex"; String AGGREGATED_SIZE = "CamelAggregatedSize"; + String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy"; String ASYNC_WAIT = "CamelAsyncWait"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=916707&r1=916706&r2=916707&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Feb 26 15:12:49 2010 @@ -219,6 +219,7 @@ if (getCompletionPredicate() != null) { boolean answer = getCompletionPredicate().matches(exchange); if (answer) { + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); return true; } } @@ -228,6 +229,7 @@ if (value != null && value > 0) { int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); if (size >= value) { + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size"); return true; } } @@ -235,6 +237,7 @@ if (getCompletionSize() > 0) { int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); if (size >= getCompletionSize()) { + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size"); return true; } } @@ -268,6 +271,7 @@ if (size > 0 && batchConsumerCounter.intValue() >= size) { // batch consumer is complete then reset the counter batchConsumerCounter.set(0); + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "consumer"); return true; } } @@ -433,10 +437,15 @@ } protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> entry) { + Object key = entry.getKey(); + Exchange exchange = entry.getValue(); + if (log.isDebugEnabled()) { - log.debug("Completion timeout triggered for correlation key: " + entry.getKey()); + log.debug("Completion timeout triggered for correlation key: " + key); } - onCompletion(entry.getKey(), entry.getValue(), true); + + exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); + onCompletion(key, exchange, true); return true; } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=916707&r1=916706&r2=916707&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Fri Feb 26 15:12:49 2010 @@ -45,6 +45,7 @@ public void testAggregateProcessorCompletionPredicate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B+END"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); @@ -85,6 +86,7 @@ public void testAggregateProcessorCompletionPredicateEager() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B+END"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); @@ -133,6 +135,7 @@ private void doTestAggregateProcessorCompletionAggregatedSize(boolean eager) throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B+C"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "size"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); @@ -180,6 +183,7 @@ private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B+C"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); @@ -359,6 +363,7 @@ public void testAggregateUseBatchSizeFromConsumer() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B", "C+D+E"); + mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "consumer"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id");