CAMEL-2939: Make AggregationRepository recoverable
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a22bd8a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a22bd8a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a22bd8a Branch: refs/heads/master Commit: 8a22bd8ae0e96288f7578293a6d5e8861ae3fbe2 Parents: 742eda6 Author: Gerald Quintana <gerald.quint...@gmail.com> Authored: Wed Jan 14 21:50:09 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 18 10:00:49 2015 +0100 ---------------------------------------------------------------------- .../CassandraAggregationRepository.java | 100 ++++++++++++++++++- .../CassandraAggregationRepositoryTest.java | 51 ++++++++-- 2 files changed, 138 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8a22bd8a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java index d075e59..e5f689f 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java @@ -27,9 +27,11 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.spi.AggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.support.ServiceSupport; import org.apache.camel.utils.cassandra.CassandraSessionHolder; import org.slf4j.Logger; @@ -49,7 +51,7 @@ import static org.apache.camel.utils.cassandra.CassandraUtils.generateSelect; * Warning: Cassandra is not the best tool for queuing use cases * See: http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets */ -public abstract class CassandraAggregationRepository extends ServiceSupport implements AggregationRepository { +public abstract class CassandraAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { /** * Logger */ @@ -95,13 +97,21 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl private PreparedStatement selectStatement; private PreparedStatement deleteStatement; /** - * Prepared statement used to get keys and exchange ids + * Prepared statement used to get exchangeIds and exchange ids */ private PreparedStatement selectKeyIdStatement; /** * Prepared statement used to delete with key and exchange id */ private PreparedStatement deleteIfIdStatement; + + private long recoveryIntervalInMillis = 5000; + + private boolean useRecovery = true; + + private String deadLetterUri; + + private int maximumRedeliveries; public CassandraAggregationRepository() { } @@ -278,7 +288,7 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl } /** - * Get aggregation keys from aggregation table. + * Get aggregation exchangeIds from aggregation table. */ @Override public Set<String> getKeys() { @@ -292,6 +302,41 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl } + /** + * Get exchange IDs to be recovered + * @return Exchange IDs + */ + @Override + public Set<String> scan(CamelContext camelContext) { + List<Row> rows = selectKeyIds(); + Set<String> exchangeIds = new HashSet<String>(rows.size()); + for (Row row : rows) { + exchangeIds.add(row.getString(exchangeIdColumn)); + } + return exchangeIds; + } + + /** + * Get exchange by exchange ID. + * This is far from optimal. + */ + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + List<Row> rows = selectKeyIds(); + String keyColumnName = getPKColumns()[1]; + String lKey = null; + for (Row row : rows) { + String lExchangeId = row.getString(exchangeIdColumn); + if (lExchangeId.equals(exchangeId)) { + lKey = row.getString(keyColumnName); + break; + } + } + return lKey == null ? null : get(camelContext, lKey); + } + + + // ------------------------------------------------------------------------- // Getters and Setters @@ -359,4 +404,53 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl this.ttl = ttl; } + @Override + public long getRecoveryIntervalInMillis() { + return recoveryIntervalInMillis; + } + + public void setRecoveryIntervalInMillis(long recoveryIntervalInMillis) { + this.recoveryIntervalInMillis = recoveryIntervalInMillis; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryIntervalInMillis = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long recoveryIntervalInMillis) { + this.recoveryIntervalInMillis = recoveryIntervalInMillis; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public String getDeadLetterUri() { + return deadLetterUri; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterUri = deadLetterUri; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/8a22bd8a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java index 1011212..15714ac 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java @@ -149,10 +149,7 @@ public class CassandraAggregationRepositoryTest { public void testGetKeys() { // Given String[] keys = {"GetKeys1", "GetKeys2"}; - for (String key : keys) { - Exchange exchange = new DefaultExchange(camelContext); - aggregationRepository.add(camelContext, key, exchange); - } + addExchanges(keys); // When Set<String> keySet = aggregationRepository.getKeys(); // Then @@ -182,19 +179,53 @@ public class CassandraAggregationRepositoryTest { @Test public void testConfirmNotExist() { // Given + String[] keys=new String[3]; for (int i = 1; i < 4; i++) { - String key = "Confirm_" + i; + keys[i-1] = "Confirm" + i; + } + addExchanges(keys); + for(String key:keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Exchange-Confirm5"); + // Then + for (String key: keys) { + assertTrue(exists(key)); + } + } + private void addExchanges(String ... keys) { + for (String key : keys) { Exchange exchange = new DefaultExchange(camelContext); - exchange.setExchangeId("Exchange_" + i); + exchange.setExchangeId("Exchange-"+key); aggregationRepository.add(camelContext, key, exchange); - assertTrue(exists(key)); } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); // When - aggregationRepository.confirm(camelContext, "Exchange_5"); + Set<String> exchangeIdSet = aggregationRepository.scan(camelContext); // Then - for (int i = 1; i < 4; i++) { - assertTrue(exists("Confirm_" + i)); + for (String key : keys) { + assertTrue(exchangeIdSet.contains("Exchange-"+key)); } } + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(camelContext, "Exchange-Recover2"); + Exchange exchange3 = aggregationRepository.recover(camelContext, "Exchange-Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + }