CAMEL-2939: Add unnamed repositories for aggregation and idempotent

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94ebb0c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94ebb0c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94ebb0c5

Branch: refs/heads/master
Commit: 94ebb0c5c78c194ce715ddcb178bd87fd7aaccbe
Parents: 8a22bd8
Author: Gerald Quintana <gerald.quint...@gmail.com>
Authored: Sat Jan 17 22:25:21 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 18 10:00:49 2015 +0100

----------------------------------------------------------------------
 .../CassandraAggregationRepository.java         |  41 ++--
 .../NamedCassandraAggregationRepository.java    |  22 +-
 .../CassandraIdempotentRepository.java          |  21 +-
 .../NamedCassandraIdempotentRepository.java     |  21 +-
 .../camel/utils/cassandra/CassandraUtils.java   |  31 ++-
 .../CassandraAggregationRepositoryTest.java     |   4 +-
 .../cassandra/CassandraAggregationTest.java     |   3 +-
 ...NamedCassandraAggregationRepositoryTest.java | 232 +++++++++++++++++++
 .../CassandraIdempotentRepositoryTest.java      |   4 +-
 .../cassandra/CassandraIdempotentTest.java      |   3 +-
 .../NamedCassandraIdempotentRepositoryTest.java | 144 ++++++++++++
 .../src/test/resources/AggregationDataSet.cql   |   3 +-
 .../src/test/resources/IdempotentDataSet.cql    |  15 +-
 .../test/resources/NamedAggregationDataSet.cql  |   7 +
 .../test/resources/NamedIdempotentDataSet.cql   |  11 +
 15 files changed, 491 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
index e5f689f..40fc257 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
@@ -51,7 +51,7 @@ import static 
org.apache.camel.utils.cassandra.CassandraUtils.generateSelect;
  * Warning: Cassandra is not the best tool for queuing use cases
  * See: 
http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets
  */
-public abstract class CassandraAggregationRepository extends ServiceSupport 
implements RecoverableAggregationRepository {
+public class CassandraAggregationRepository extends ServiceSupport implements 
RecoverableAggregationRepository {
     /**
      * Logger
      */
@@ -73,9 +73,13 @@ public abstract class CassandraAggregationRepository extends 
ServiceSupport impl
      */
     private String exchangeColumn = "EXCHANGE";
     /**
+     * Values used as primary key prefix
+     */
+    private Object[] prefixPKValues = new Object[0];
+    /**
      * Primary key columns
      */
-    private String[] pkColumns;
+    private String[] pkColumns = {"KEY"};
     /**
      * Exchange marshaller/unmarshaller
      */
@@ -125,15 +129,10 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     }
 
     /**
-     * Get fixed primary key values.
-     */
-    protected abstract Object[] getPKValues();
-
-    /**
-     * Generate primary key values: fixed + aggregation key.
+     * Generate primary key values from aggregation key.
      */
     protected Object[] getPKValues(String key) {
-        return append(getPKValues(), key);
+        return append(prefixPKValues, key);
     }
 
     /**
@@ -239,14 +238,13 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
      */
     @Override
     public void confirm(CamelContext camelContext, String exchangeId) {
-        Object[] pkValues = getPKValues();
         String keyColumn = getKeyColumn();
-        LOGGER.debug("Selecting Ids {} ", pkValues);
+        LOGGER.debug("Selecting Ids");
         List<Row> rows = selectKeyIds();
         for (Row row : rows) {
             if (row.getString(exchangeIdColumn).equals(exchangeId)) {
                 String key = row.getString(keyColumn);
-                Object[] cqlParams = append(pkValues, key, exchangeId);
+                Object[] cqlParams = append(getPKValues(key), exchangeId);
                 LOGGER.debug("Deleting If Id {} ", cqlParams);
                 getSession().execute(deleteIfIdStatement.bind(cqlParams));
             }
@@ -281,10 +279,9 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
         selectKeyIdStatement = 
applyConsistencyLevel(getSession().prepare(cql), readConsistencyLevel);
     }
 
-    private List<Row> selectKeyIds() {
-        Object[] pkValues = getPKValues();
-        LOGGER.debug("Selecting keys {}", pkValues);
-        return getSession().execute(selectKeyIdStatement.bind(pkValues)).all();
+    protected List<Row> selectKeyIds() {
+        LOGGER.debug("Selecting keys {}", getPrefixPKValues());
+        return 
getSession().execute(selectKeyIdStatement.bind(getPrefixPKValues())).all();
     }
 
     /**
@@ -294,7 +291,7 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     public Set<String> getKeys() {
         List<Row> rows = selectKeyIds();
         Set<String> keys = new HashSet<String>(rows.size());
-        String keyColumnName = getPKColumns()[1];
+        String keyColumnName = getKeyColumn();
         for (Row row : rows) {
             keys.add(row.getString(keyColumnName));
         }
@@ -323,7 +320,7 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
     @Override
     public Exchange recover(CamelContext camelContext, String exchangeId) {
         List<Row> rows = selectKeyIds();
-        String keyColumnName = getPKColumns()[1];
+        String keyColumnName = getKeyColumn();
         String lKey = null;
         for (Row row : rows) {
             String lExchangeId = row.getString(exchangeIdColumn);
@@ -356,6 +353,14 @@ public abstract class CassandraAggregationRepository 
extends ServiceSupport impl
         this.table = table;
     }
 
+    public Object[] getPrefixPKValues() {
+        return prefixPKValues;
+    }
+
+    public void setPrefixPKValues(Object ... prefixPKValues) {
+        this.prefixPKValues = prefixPKValues;
+    }
+
     public String[] getPKColumns() {
         return pkColumns;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
index 347ed75..f13230f 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor.aggregate.cassandra;
 
+
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 
@@ -24,38 +25,29 @@ import com.datastax.driver.core.Session;
  * columns as primary key: name (partition key) and key (clustering key).
  */
 public class NamedCassandraAggregationRepository extends 
CassandraAggregationRepository {
-    /**
-     * Aggregation repository name
-     */
-    private String name;
-
     public NamedCassandraAggregationRepository() {
         setPKColumns("NAME", "KEY");
+        setName("DEFAULT");
     }
 
     public NamedCassandraAggregationRepository(Session session, String name) {
         super(session);
-        this.name = name;
         setPKColumns("NAME", "KEY");
+        setName(name);
     }
 
     public NamedCassandraAggregationRepository(Cluster cluster, String 
keyspace, String name) {
         super(cluster, keyspace);
-        this.name = name;
         setPKColumns("NAME", "KEY");
-    }
-
-    @Override
-    protected Object[] getPKValues() {
-        return new Object[]{name};
+        setName(name);
     }
 
     public String getName() {
-        return name;
+        return (String) getPrefixPKValues()[0];
     }
 
-    public void setName(String name) {
-        this.name = name;
+    public final void setName(String name) {
+        setPrefixPKValues(name);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
index ddb7168..7b3699d 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
@@ -28,6 +28,7 @@ import 
org.apache.camel.utils.cassandra.CassandraSessionHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.utils.cassandra.CassandraUtils.append;
 import static 
org.apache.camel.utils.cassandra.CassandraUtils.applyConsistencyLevel;
 import static org.apache.camel.utils.cassandra.CassandraUtils.generateDelete;
 import static org.apache.camel.utils.cassandra.CassandraUtils.generateInsert;
@@ -42,7 +43,7 @@ import static 
org.apache.camel.utils.cassandra.CassandraUtils.generateSelect;
  *
  * @param <K> Message Id
  */
-public abstract class CassandraIdempotentRepository<K> extends ServiceSupport 
implements IdempotentRepository<K> {
+public class CassandraIdempotentRepository<K> extends ServiceSupport 
implements IdempotentRepository<K> {
     /**
      * Logger
      */
@@ -56,9 +57,13 @@ public abstract class CassandraIdempotentRepository<K> 
extends ServiceSupport im
      */
     private String table = "CAMEL_IDEMPOTENT";
     /**
+     * Values used as primary key prefix
+     */
+    private Object[] prefixPKValues = new Object[0];
+    /**
      * Primary key columns
      */
-    private String[] pkColumns;
+    private String[] pkColumns = {"KEY"};
     /**
      * Time to live in seconds used for inserts
      */
@@ -97,7 +102,9 @@ public abstract class CassandraIdempotentRepository<K> 
extends ServiceSupport im
         }
     }
 
-    protected abstract Object[] getPKValues(K key);
+    protected Object[] getPKValues(K key) {
+        return append(prefixPKValues, key);
+    }
     // 
-------------------------------------------------------------------------
     // Lifecycle methods
 
@@ -217,4 +224,12 @@ public abstract class CassandraIdempotentRepository<K> 
extends ServiceSupport im
         this.readConsistencyLevel = readConsistencyLevel;
     }
 
+    public Object[] getPrefixPKValues() {
+        return prefixPKValues;
+    }
+
+    public void setPrefixPKValues(Object[] prefixPKValues) {
+        this.prefixPKValues = prefixPKValues;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
index 73219fc..6a10212 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
@@ -24,38 +24,29 @@ import com.datastax.driver.core.Session;
  * columns as primary key: name (partition key) and key (clustering key).
  */
 public class NamedCassandraIdempotentRepository<K> extends 
CassandraIdempotentRepository<K> {
-    /**
-     * Idempotent repository name
-     */
-    private String name;
-
     public NamedCassandraIdempotentRepository() {
         setPKColumns("NAME", "KEY");
+        setName("DEFAULT");
     }
 
     public NamedCassandraIdempotentRepository(Session session, String name) {
         super(session);
-        this.name = name;
         setPKColumns("NAME", "KEY");
+        setName(name);
     }
 
     public NamedCassandraIdempotentRepository(Cluster cluster, String 
keyspace, String name) {
         super(cluster, keyspace);
-        this.name = name;
         setPKColumns("NAME", "KEY");
-    }
-
-    @Override
-    protected Object[] getPKValues(K key) {
-        return new Object[]{name, key};
+        setName(name);
     }
 
     public String getName() {
-        return name;
+        return (String) getPrefixPKValues()[0];
     }
 
-    public void setName(String name) {
-        this.name = name;
+    public final void setName(String name) {
+        setPrefixPKValues(new String[]{name});
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
index defc169..6a0722d 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
@@ -32,21 +32,42 @@ public class CassandraUtils {
         }
         return statement;
     }
-
+    private static boolean isEmpty(Object[] array) {
+        return array == null || array.length == 0;
+    }
     /**
      * Concatenate 2 arrays.
      */
     public static Object[] concat(Object[] array1, Object[] array2) {
+        if (isEmpty(array1)) {
+            return array2;
+        }
+        if (isEmpty(array2)) {
+            return array1;
+        }
         Object[] array = new Object[array1.length + array2.length];
         System.arraycopy(array1, 0, array, 0, array1.length);
         System.arraycopy(array2, 0, array, array1.length, array2.length);
         return array;
     }
 
+    private static int size(String[] array) {
+        return array == null ? 0 : array.length;
+    }
+
+    private static boolean isEmpty(String[] array) {
+        return size(array) == 0;
+    }
     /**
      * Concatenate 2 arrays.
      */
     public static String[] concat(String[] array1, String[] array2) {
+        if (isEmpty(array1)) {
+            return array2;
+        }
+        if (isEmpty(array2)) {
+            return array1;
+        }
         String[] array = new String[array1.length + array2.length];
         System.arraycopy(array1, 0, array, 0, array1.length);
         System.arraycopy(array2, 0, array, array1.length, array2.length);
@@ -90,6 +111,9 @@ public class CassandraUtils {
      * Append where columns = ? to CQL.
      */
     public static void appendWhere(StringBuilder cqlBuilder, String[] columns, 
int maxColumnIndex) {
+        if (isEmpty(columns) || maxColumnIndex<= 0) {
+            return;
+        }
         cqlBuilder.append(" where ");
         appendColumns(cqlBuilder, columns, "=? and ", maxColumnIndex);
         cqlBuilder.append("=?");
@@ -132,12 +156,11 @@ public class CassandraUtils {
         }
         return cqlBuilder;
     }
-
     /**
      * Generate select where columns = ? CQL.
      */
     public static StringBuilder generateSelect(String table, String[] 
selectColumns, String[] whereColumns) {
-        return generateSelect(table, selectColumns, whereColumns, 
whereColumns.length);
+        return generateSelect(table, selectColumns, whereColumns, 
size(whereColumns));
     }
 
     /**
@@ -155,7 +178,7 @@ public class CassandraUtils {
      * Generate delete where columns = ? CQL.
      */
     public static StringBuilder generateDelete(String table, String[] 
whereColumns, boolean ifExists) {
-        return generateDelete(table, whereColumns, whereColumns.length, 
ifExists);
+        return generateDelete(table, whereColumns, size(whereColumns), 
ifExists);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
index 15714ac..b0c7195 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java
@@ -61,7 +61,7 @@ public class CassandraAggregationRepositoryTest {
         camelContext = new DefaultCamelContext();
         cluster = CassandraUnitUtils.cassandraCluster();
         session = cluster.connect(CassandraUnitUtils.KEYSPACE);
-        aggregationRepository = new 
NamedCassandraAggregationRepository(session, "ID");
+        aggregationRepository = new CassandraAggregationRepository(session);
         aggregationRepository.start();
     }
 
@@ -79,7 +79,7 @@ public class CassandraAggregationRepositoryTest {
 
     private boolean exists(String key) {
         return session.execute(
-                "select KEY from CAMEL_AGGREGATION where NAME=? and KEY=?", 
"ID", key)
+                "select KEY from CAMEL_AGGREGATION where KEY=?", key)
                 .one() != null;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java
index 0302663..d3f9208 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java
@@ -38,9 +38,10 @@ public class CassandraAggregationTest extends 
CamelTestSupport {
         CassandraUnitUtils.startEmbeddedCassandra();
         cluster = CassandraUnitUtils.cassandraCluster();
         Session rootSession = cluster.connect();
-        CassandraUnitUtils.loadCQLDataSet(rootSession, 
"AggregationDataSet.cql");
+        CassandraUnitUtils.loadCQLDataSet(rootSession, 
"NamedAggregationDataSet.cql");
         rootSession.close();
         aggregationRepository = new 
NamedCassandraAggregationRepository(cluster, CassandraUnitUtils.KEYSPACE, "ID");
+        aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
         aggregationRepository.start();
         super.doPreSetup();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java
new file mode 100644
index 0000000..a4c9a76
--- /dev/null
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+import java.util.Set;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.cassandra.CassandraUnitUtils;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.cassandraunit.CassandraCQLUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unite test for {@link CassandraAggregationRepository}
+ */
+public class NamedCassandraAggregationRepositoryTest {
+    @Rule
+    public CassandraCQLUnit cassandraRule = 
CassandraUnitUtils.cassandraCQLUnit("NamedAggregationDataSet.cql");
+
+    private Cluster cluster;
+    private Session session;
+    private CassandraAggregationRepository aggregationRepository;
+    private CamelContext camelContext;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        CassandraUnitUtils.startEmbeddedCassandra();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        camelContext = new DefaultCamelContext();
+        cluster = CassandraUnitUtils.cassandraCluster();
+        session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        aggregationRepository = new 
NamedCassandraAggregationRepository(session, "ID");
+        aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+        aggregationRepository.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        aggregationRepository.stop();
+        session.close();
+        cluster.close();
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        CassandraUnitUtils.cleanEmbeddedCassandra();
+    }
+
+    private boolean exists(String key) {
+        return session.execute(
+                "select KEY from NAMED_CAMEL_AGGREGATION where NAME=? and 
KEY=?", "ID", key)
+                .one() != null;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(camelContext);
+        // When
+        aggregationRepository.add(camelContext, key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(camelContext, key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        aggregationRepository.add(camelContext, key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(camelContext);
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(camelContext, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = {"GetKeys1", "GetKeys2"};
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(camelContext, key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Exchange_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys=new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i-1] = "Confirm" + i; 
+        }
+        addExchanges(keys);
+        for(String key:keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(camelContext, "Exchange-Confirm5");
+        // Then
+        for (String key: keys) {
+            assertTrue(exists(key));
+        }
+    }
+    private void addExchanges(String ... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setExchangeId("Exchange-"+key);
+            aggregationRepository.add(camelContext, key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = {"Scan1", "Scan2"};
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(camelContext);
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains("Exchange-"+key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = {"Recover1", "Recover2"};
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(camelContext, 
"Exchange-Recover2");
+        Exchange exchange3 = aggregationRepository.recover(camelContext, 
"Exchange-Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java
index 89a73b4..2a14380 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java
@@ -50,7 +50,7 @@ public class CassandraIdempotentRepositoryTest {
     public void setUp() throws Exception {
         cluster = CassandraUnitUtils.cassandraCluster();
         session = cluster.connect(CassandraUnitUtils.KEYSPACE);
-        idempotentRepository = new 
NamedCassandraIdempotentRepository<String>(session, "ID");
+        idempotentRepository = new 
CassandraIdempotentRepository<String>(session);
         idempotentRepository.start();
     }
 
@@ -68,7 +68,7 @@ public class CassandraIdempotentRepositoryTest {
 
     private boolean exists(String key) {
         return session.execute(
-                "select KEY from CAMEL_IDEMPOTENT where NAME=? and KEY=?", 
"ID", key)
+                "select KEY from CAMEL_IDEMPOTENT where KEY=?", key)
                 .one() != null;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java
index 1380bf9..e8e7338 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java
@@ -36,9 +36,10 @@ public class CassandraIdempotentTest extends 
CamelTestSupport {
         CassandraUnitUtils.startEmbeddedCassandra();
         cluster = CassandraUnitUtils.cassandraCluster();
         Session rootSession = cluster.connect();
-        CassandraUnitUtils.loadCQLDataSet(rootSession, 
"IdempotentDataSet.cql");
+        CassandraUnitUtils.loadCQLDataSet(rootSession, 
"NamedIdempotentDataSet.cql");
         rootSession.close();
         idempotentRepository = new NamedCassandraIdempotentRepository(cluster, 
CassandraUnitUtils.KEYSPACE, "ID");
+        idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
         idempotentRepository.start();
         super.doPreSetup();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java
new file mode 100644
index 0000000..bdb577f
--- /dev/null
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.camel.component.cassandra.CassandraUnitUtils;
+import org.cassandraunit.CassandraCQLUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link CassandraIdempotentRepository}
+ */
+public class NamedCassandraIdempotentRepositoryTest {
+    @Rule
+    public CassandraCQLUnit cassandraRule = 
CassandraUnitUtils.cassandraCQLUnit("NamedIdempotentDataSet.cql");
+
+    private Cluster cluster;
+    private Session session;
+    private CassandraIdempotentRepository<String> idempotentRepository;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        CassandraUnitUtils.startEmbeddedCassandra();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        cluster = CassandraUnitUtils.cassandraCluster();
+        session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        idempotentRepository = new 
NamedCassandraIdempotentRepository<String>(session, "ID");
+        idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
+        idempotentRepository.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        idempotentRepository.stop();
+        session.close();
+        cluster.close();
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        CassandraUnitUtils.cleanEmbeddedCassandra();
+    }
+
+    private boolean exists(String key) {
+        return session.execute(
+                "select KEY from NAMED_CAMEL_IDEMPOTENT where NAME=? and 
KEY=?", "ID", key)
+                .one() != null;
+    }
+
+    @Test
+    public void testAddNotExists() {
+        // Given
+        String key = "Add_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertTrue(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testAddExists() {
+        // Given
+        String key = "Add_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertFalse(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testContainsNotExists() {
+        // Given
+        String key = "Contains_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertFalse(result);
+    }
+
+    @Test
+    public void testContainsExists() {
+        // Given
+        String key = "Contains_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertTrue(result);
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "Remove_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        // assertFalse(result);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.remove(key);
+        // Then
+        assertTrue(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql 
b/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql
index aa6699a..0ff543a 100644
--- a/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql
+++ b/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql
@@ -1,7 +1,6 @@
 CREATE TABLE CAMEL_AGGREGATION (
-  NAME varchar,
   KEY varchar,
   EXCHANGE_ID varchar,
   EXCHANGE blob,
-  PRIMARY KEY (NAME, KEY)
+  PRIMARY KEY (KEY)
 );

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql 
b/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql
index 430e920..50a09a7 100644
--- a/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql
+++ b/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql
@@ -1,11 +1,10 @@
 CREATE TABLE CAMEL_IDEMPOTENT (
-  NAME varchar,
   KEY varchar,
-  PRIMARY KEY (NAME, KEY)
+  PRIMARY KEY (KEY)
 );
-INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY)
-    VALUES('ID','Add_Exists');
-INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY)
-    VALUES('ID','Contains_Exists');
-INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY)
-    VALUES('ID','Remove_Exists');
+INSERT INTO CAMEL_IDEMPOTENT(KEY)
+    VALUES('Add_Exists');
+INSERT INTO CAMEL_IDEMPOTENT(KEY)
+    VALUES('Contains_Exists');
+INSERT INTO CAMEL_IDEMPOTENT(KEY)
+    VALUES('Remove_Exists');

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql 
b/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql
new file mode 100644
index 0000000..2b53326
--- /dev/null
+++ 
b/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql
@@ -0,0 +1,7 @@
+CREATE TABLE NAMED_CAMEL_AGGREGATION (
+  NAME varchar,
+  KEY varchar,
+  EXCHANGE_ID varchar,
+  EXCHANGE blob,
+  PRIMARY KEY (NAME, KEY)
+);

http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql 
b/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql
new file mode 100644
index 0000000..14a8e52
--- /dev/null
+++ b/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql
@@ -0,0 +1,11 @@
+CREATE TABLE NAMED_CAMEL_IDEMPOTENT (
+  NAME varchar,
+  KEY varchar,
+  PRIMARY KEY (NAME, KEY)
+);
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Add_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Contains_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Remove_Exists');

Reply via email to