CAMEL-2939: Make AggregationRepository recoverable

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a22bd8a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a22bd8a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a22bd8a

Branch: refs/heads/master
Commit: 8a22bd8ae0e96288f7578293a6d5e8861ae3fbe2
Parents: 742eda6
Author: Gerald Quintana <gerald.quint...@gmail.com>
Authored: Wed Jan 14 21:50:09 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 18 10:00:49 2015 +0100

----------------------------------------------------------------------
 .../CassandraAggregationRepository.java         | 100 ++++++++++++++++++-
 .../CassandraAggregationRepositoryTest.java     |  51 ++++++++--
 2 files changed, 138 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8a22bd8a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
index d075e59..e5f689f 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
@@ -27,9 +27,11 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import java.util.concurrent.TimeUnit;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.utils.cassandra.CassandraSessionHolder;
 import org.slf4j.Logger;
@@ -49,7 +51,7 @@ import static 
org.apache.camel.utils.cassandra.CassandraUtils.generateSelect;
  * Warning: Cassandra is not the best tool for queuing use cases
  * See: 
http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets
  */
-public abstract class CassandraAggregationRepository extends ServiceSupport 
implements AggregationRepository {
+public abstract class CassandraAggregationRepository extends ServiceSupport 
implements RecoverableAggregationRepository {
     /**
      * Logger
      */
@@ -95,13 +97,21 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     private PreparedStatement selectStatement;
     private PreparedStatement deleteStatement;
     /**
-     * Prepared statement used to get keys and exchange ids
+     * Prepared statement used to get exchangeIds and exchange ids
      */
     private PreparedStatement selectKeyIdStatement;
     /**
      * Prepared statement used to delete with key and exchange id
      */
     private PreparedStatement deleteIfIdStatement;
+    
+    private long recoveryIntervalInMillis = 5000;
+
+    private boolean useRecovery = true;
+
+    private String deadLetterUri;
+
+    private int maximumRedeliveries;
 
     public CassandraAggregationRepository() {
     }
@@ -278,7 +288,7 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     }
 
     /**
-     * Get aggregation keys from aggregation table.
+     * Get aggregation exchangeIds from aggregation table.
      */
     @Override
     public Set<String> getKeys() {
@@ -292,6 +302,41 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     }
 
 
+    /**
+     * Get exchange IDs to be recovered
+     * @return Exchange IDs
+     */
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
+        List<Row> rows = selectKeyIds();
+        Set<String> exchangeIds = new HashSet<String>(rows.size());
+        for (Row row : rows) {
+            exchangeIds.add(row.getString(exchangeIdColumn));
+        }
+        return exchangeIds;
+    }
+
+    /**
+     * Get exchange by exchange ID.
+     * This is far from optimal.
+     */
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
+        List<Row> rows = selectKeyIds();
+        String keyColumnName = getPKColumns()[1];
+        String lKey = null;
+        for (Row row : rows) {
+            String lExchangeId = row.getString(exchangeIdColumn);
+            if (lExchangeId.equals(exchangeId)) {
+                lKey = row.getString(keyColumnName);
+                break;
+            }            
+        }
+        return lKey == null ? null : get(camelContext, lKey);
+    }
+    
+    
+
     // 
-------------------------------------------------------------------------
     // Getters and Setters
 
@@ -359,4 +404,53 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
         this.ttl = ttl;
     }
 
+    @Override
+    public long getRecoveryIntervalInMillis() {
+        return recoveryIntervalInMillis;
+    }
+
+    public void setRecoveryIntervalInMillis(long recoveryIntervalInMillis) {
+        this.recoveryIntervalInMillis = recoveryIntervalInMillis;
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryIntervalInMillis = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public void setRecoveryInterval(long recoveryIntervalInMillis) {
+        this.recoveryIntervalInMillis = recoveryIntervalInMillis;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterUri;
+    }
+
+    @Override
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterUri = deadLetterUri;
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8a22bd8a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
index 1011212..15714ac 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
@@ -149,10 +149,7 @@ public class CassandraAggregationRepositoryTest {
     public void testGetKeys() {
         // Given
         String[] keys = {"GetKeys1", "GetKeys2"};
-        for (String key : keys) {
-            Exchange exchange = new DefaultExchange(camelContext);
-            aggregationRepository.add(camelContext, key, exchange);
-        }
+        addExchanges(keys);
         // When
         Set<String> keySet = aggregationRepository.getKeys();
         // Then
@@ -182,19 +179,53 @@ public class CassandraAggregationRepositoryTest {
     @Test
     public void testConfirmNotExist() {
         // Given
+        String[] keys=new String[3];
         for (int i = 1; i < 4; i++) {
-            String key = "Confirm_" + i;
+            keys[i-1] = "Confirm" + i; 
+        }
+        addExchanges(keys);
+        for(String key:keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Exchange-Confirm5");
+        // Then
+        for (String key: keys) {
+            assertTrue(exists(key));
+        }
+    }
+    private void addExchanges(String ... keys) {
+        for (String key : keys) {
             Exchange exchange = new DefaultExchange(camelContext);
-            exchange.setExchangeId("Exchange_" + i);
+            exchange.setExchangeId("Exchange-"+key);
             aggregationRepository.add(camelContext, key, exchange);
-            assertTrue(exists(key));
         }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = {"Scan1", "Scan2"};
+        addExchanges(keys);
         // When
-        aggregationRepository.confirm(camelContext, "Exchange_5");
+        Set<String> exchangeIdSet = aggregationRepository.scan(camelContext);
         // Then
-        for (int i = 1; i < 4; i++) {
-            assertTrue(exists("Confirm_" + i));
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains("Exchange-"+key));
         }
     }
 
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = {"Recover1", "Recover2"};
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(camelContext, 
"Exchange-Recover2");
+        Exchange exchange3 = aggregationRepository.recover(camelContext, 
"Exchange-Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
 }

Reply via email to