Author: davsclaus
Date: Fri Feb 26 15:12:49 2010
New Revision: 916707

URL: http://svn.apache.org/viewvc?rev=916707&view=rev
Log:
CAMEL-2500: Added property on aggregated exchange to know how it completed.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=916707&r1=916706&r2=916707&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Feb 
26 15:12:49 2010
@@ -36,6 +36,7 @@
     @Deprecated
     String AGGREGATED_INDEX = "CamelAggregatedIndex";
     String AGGREGATED_SIZE  = "CamelAggregatedSize";
+    String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
 
     String ASYNC_WAIT = "CamelAsyncWait";
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=916707&r1=916706&r2=916707&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Fri Feb 26 15:12:49 2010
@@ -219,6 +219,7 @@
         if (getCompletionPredicate() != null) {
             boolean answer = getCompletionPredicate().matches(exchange);
             if (answer) {
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"predicate");
                 return true;
             }
         }
@@ -228,6 +229,7 @@
             if (value != null && value > 0) {
                 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, 
Integer.class);
                 if (size >= value) {
+                    exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"size");
                     return true;
                 }
             }
@@ -235,6 +237,7 @@
         if (getCompletionSize() > 0) {
             int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, 
Integer.class);
             if (size >= getCompletionSize()) {
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size");
                 return true;
             }
         }
@@ -268,6 +271,7 @@
             if (size > 0 && batchConsumerCounter.intValue() >= size) {
                 // batch consumer is complete then reset the counter
                 batchConsumerCounter.set(0);
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
"consumer");
                 return true;
             }
         }
@@ -433,10 +437,15 @@
         }
 
         protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> 
entry) {
+            Object key = entry.getKey();
+            Exchange exchange = entry.getValue();
+
             if (log.isDebugEnabled()) {
-                log.debug("Completion timeout triggered for correlation key: " 
+ entry.getKey());
+                log.debug("Completion timeout triggered for correlation key: " 
+ key);
             }
-            onCompletion(entry.getKey(), entry.getValue(), true);
+
+            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+            onCompletion(key, exchange, true);
             return true;
         }
     }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=916707&r1=916706&r2=916707&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
 Fri Feb 26 15:12:49 2010
@@ -45,6 +45,7 @@
     public void testAggregateProcessorCompletionPredicate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+B+END");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"predicate");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
@@ -85,6 +86,7 @@
     public void testAggregateProcessorCompletionPredicateEager() throws 
Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+B+END");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"predicate");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
@@ -133,6 +135,7 @@
     private void doTestAggregateProcessorCompletionAggregatedSize(boolean 
eager) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+B+C");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"size");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
@@ -180,6 +183,7 @@
     private void doTestAggregateProcessorCompletionTimeout(boolean eager) 
throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+B+C");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"timeout");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");
@@ -359,6 +363,7 @@
     public void testAggregateUseBatchSizeFromConsumer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+B", "C+D+E");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, 
"consumer");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");


Reply via email to