Repository: camel
Updated Branches:
  refs/heads/master e770d288b -> 3fbf4e851


CAMEL-8864: Include exchange property to support the flexible aggregator toolkit


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fbf4e85
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fbf4e85
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fbf4e85

Branch: refs/heads/master
Commit: 3fbf4e8513fae66d4de09b42c0c09dd7e7e76881
Parents: e770d28
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Jun 23 19:43:10 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Jun 23 19:43:10 2015 +0200

----------------------------------------------------------------------
 camel-core/src/main/java/org/apache/camel/Exchange.java         | 1 +
 .../apache/camel/util/toolbox/FlexibleAggregationStrategy.java  | 5 ++---
 .../processor/aggregate/cassandra/CassandraCamelCodec.java      | 2 ++
 .../org/apache/camel/component/hawtdb/HawtDBCamelCodec.java     | 2 ++
 .../org/apache/camel/component/leveldb/LevelDBCamelCodec.java   | 2 ++
 .../apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java   | 3 +++
 6 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java 
b/camel-core/src/main/java/org/apache/camel/Exchange.java
index d3f841b..5cfe78e 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -77,6 +77,7 @@ public interface Exchange {
     String AGGREGATED_TIMEOUT               = "CamelAggregatedTimeout";
     String AGGREGATED_COMPLETED_BY          = "CamelAggregatedCompletedBy";
     String AGGREGATED_CORRELATION_KEY       = "CamelAggregatedCorrelationKey";
+    String AGGREGATED_COLLECTION_GUARD      = "CamelAggregatedCollectionGuard";
     String AGGREGATION_STRATEGY             = "CamelAggregationStrategy";
     String AGGREGATION_COMPLETE_CURRENT_GROUP = 
"CamelAggregationCompleteCurrentGroup";
     String AGGREGATION_COMPLETE_ALL_GROUPS  = 
"CamelAggregationCompleteAllGroups";

http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java
index 1348163..88a304c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java
@@ -53,7 +53,6 @@ public class FlexibleAggregationStrategy<E extends Object> 
implements Aggregatio
         CompletionAwareAggregationStrategy, TimeoutAwareAggregationStrategy {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlexibleAggregationStrategy.class);
-    private static final String COLLECTION_AGGR_GUARD_PROPERTY = 
"CamelFlexAggrStrCollectionGuard";
 
     private Expression pickExpression = ExpressionBuilder.bodyExpression();
     private Predicate conditionPredicate;
@@ -270,14 +269,14 @@ public class FlexibleAggregationStrategy<E extends 
Object> implements Aggregatio
     private Collection<E> safeInsertIntoCollection(Exchange oldExchange, 
Collection<E> oldValue, E toInsert) {
         Collection<E> collection = null;
         try {
-            if (oldValue == null || 
oldExchange.getProperty(COLLECTION_AGGR_GUARD_PROPERTY, Boolean.class) == null) 
{
+            if (oldValue == null || 
oldExchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, Boolean.class) == 
null) {
                 try {
                     collection = collectionType.newInstance();
                 } catch (Exception e) {
                     LOG.warn("Could not instantiate collection of type {}. 
Aborting aggregation.", collectionType);
                     throw 
ObjectHelper.wrapCamelExecutionException(oldExchange, e);
                 }
-                oldExchange.setProperty(COLLECTION_AGGR_GUARD_PROPERTY, 
Boolean.FALSE);
+                oldExchange.setProperty(Exchange.AGGREGATED_COLLECTION_GUARD, 
Boolean.FALSE);
             } else {
                 collection = collectionType.cast(oldValue);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
index af5b2e5..38416d2 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
@@ -45,6 +45,8 @@ public class CassandraCamelCodec {
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COMPLETED_BY, 
exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_CORRELATION_KEY, 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        // and a guard property if using the flexible toolbox aggregator
+        DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COLLECTION_GUARD, 
exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class));
         // persist the from endpoint as well
         if (exchange.getFromEndpoint() != null) {
             DefaultExchangeHolder.addProperty(pe, 
"CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());

http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
 
b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
index a7a5f11..1420ef7 100644
--- 
a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
+++ 
b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java
@@ -61,6 +61,8 @@ public final class HawtDBCamelCodec {
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COMPLETED_BY, 
exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_CORRELATION_KEY, 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        // and a guard property if using the flexible toolbox aggregator
+        DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COLLECTION_GUARD, 
exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class));
         // persist the from endpoint as well
         if (exchange.getFromEndpoint() != null) {
             DefaultExchangeHolder.addProperty(pe, 
"CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());

http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
 
b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
index 40cefe4..8585972 100644
--- 
a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
+++ 
b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
@@ -61,6 +61,8 @@ public final class LevelDBCamelCodec {
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COMPLETED_BY, 
exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_CORRELATION_KEY, 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        // and a guard property if using the flexible toolbox aggregator
+        DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COLLECTION_GUARD, 
exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class));
         // persist the from endpoint as well
         if (exchange.getFromEndpoint() != null) {
             DefaultExchangeHolder.addProperty(pe, 
"CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());

http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
index 7244ee3..ea331bd 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
@@ -44,6 +44,9 @@ public class JdbcCamelCodec {
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COMPLETED_BY, 
exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
         // add the aggregated correlation key property to retain
         DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_CORRELATION_KEY, 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_CORRELATION_KEY, 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        // and a guard property if using the flexible toolbox aggregator
+        DefaultExchangeHolder.addProperty(pe, 
Exchange.AGGREGATED_COLLECTION_GUARD, 
exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class));
         // persist the from endpoint as well
         if (exchange.getFromEndpoint() != null) {
             DefaultExchangeHolder.addProperty(pe, 
"CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());

Reply via email to