Repository: camel Updated Branches: refs/heads/master e770d288b -> 3fbf4e851
CAMEL-8864: Include exchange property to support the flexible aggregator toolkit Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fbf4e85 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fbf4e85 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fbf4e85 Branch: refs/heads/master Commit: 3fbf4e8513fae66d4de09b42c0c09dd7e7e76881 Parents: e770d28 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jun 23 19:43:10 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jun 23 19:43:10 2015 +0200 ---------------------------------------------------------------------- camel-core/src/main/java/org/apache/camel/Exchange.java | 1 + .../apache/camel/util/toolbox/FlexibleAggregationStrategy.java | 5 ++--- .../processor/aggregate/cassandra/CassandraCamelCodec.java | 2 ++ .../org/apache/camel/component/hawtdb/HawtDBCamelCodec.java | 2 ++ .../org/apache/camel/component/leveldb/LevelDBCamelCodec.java | 2 ++ .../apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java | 3 +++ 6 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index d3f841b..5cfe78e 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -77,6 +77,7 @@ public interface Exchange { String AGGREGATED_TIMEOUT = "CamelAggregatedTimeout"; String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy"; String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey"; + String AGGREGATED_COLLECTION_GUARD = "CamelAggregatedCollectionGuard"; String AGGREGATION_STRATEGY = "CamelAggregationStrategy"; String AGGREGATION_COMPLETE_CURRENT_GROUP = "CamelAggregationCompleteCurrentGroup"; String AGGREGATION_COMPLETE_ALL_GROUPS = "CamelAggregationCompleteAllGroups"; http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java index 1348163..88a304c 100644 --- a/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/util/toolbox/FlexibleAggregationStrategy.java @@ -53,7 +53,6 @@ public class FlexibleAggregationStrategy<E extends Object> implements Aggregatio CompletionAwareAggregationStrategy, TimeoutAwareAggregationStrategy { private static final Logger LOG = LoggerFactory.getLogger(FlexibleAggregationStrategy.class); - private static final String COLLECTION_AGGR_GUARD_PROPERTY = "CamelFlexAggrStrCollectionGuard"; private Expression pickExpression = ExpressionBuilder.bodyExpression(); private Predicate conditionPredicate; @@ -270,14 +269,14 @@ public class FlexibleAggregationStrategy<E extends Object> implements Aggregatio private Collection<E> safeInsertIntoCollection(Exchange oldExchange, Collection<E> oldValue, E toInsert) { Collection<E> collection = null; try { - if (oldValue == null || oldExchange.getProperty(COLLECTION_AGGR_GUARD_PROPERTY, Boolean.class) == null) { + if (oldValue == null || oldExchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, Boolean.class) == null) { try { collection = collectionType.newInstance(); } catch (Exception e) { LOG.warn("Could not instantiate collection of type {}. Aborting aggregation.", collectionType); throw ObjectHelper.wrapCamelExecutionException(oldExchange, e); } - oldExchange.setProperty(COLLECTION_AGGR_GUARD_PROPERTY, Boolean.FALSE); + oldExchange.setProperty(Exchange.AGGREGATED_COLLECTION_GUARD, Boolean.FALSE); } else { collection = collectionType.cast(oldValue); } http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java index af5b2e5..38416d2 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java @@ -45,6 +45,8 @@ public class CassandraCamelCodec { DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class)); + // and a guard property if using the flexible toolbox aggregator + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COLLECTION_GUARD, exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class)); // persist the from endpoint as well if (exchange.getFromEndpoint() != null) { DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri()); http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java index a7a5f11..1420ef7 100644 --- a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java +++ b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelCodec.java @@ -61,6 +61,8 @@ public final class HawtDBCamelCodec { DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class)); + // and a guard property if using the flexible toolbox aggregator + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COLLECTION_GUARD, exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class)); // persist the from endpoint as well if (exchange.getFromEndpoint() != null) { DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri()); http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java index 40cefe4..8585972 100644 --- a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java +++ b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java @@ -61,6 +61,8 @@ public final class LevelDBCamelCodec { DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class)); + // and a guard property if using the flexible toolbox aggregator + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COLLECTION_GUARD, exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class)); // persist the from endpoint as well if (exchange.getFromEndpoint() != null) { DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri()); http://git-wip-us.apache.org/repos/asf/camel/blob/3fbf4e85/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java index 7244ee3..ea331bd 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java @@ -44,6 +44,9 @@ public class JdbcCamelCodec { DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class)); // add the aggregated correlation key property to retain DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class)); + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class)); + // and a guard property if using the flexible toolbox aggregator + DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COLLECTION_GUARD, exchange.getProperty(Exchange.AGGREGATED_COLLECTION_GUARD, String.class)); // persist the from endpoint as well if (exchange.getFromEndpoint() != null) { DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());