CAMEL-2939: Add unnamed repositories for aggregation and idempotent
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94ebb0c5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94ebb0c5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94ebb0c5 Branch: refs/heads/master Commit: 94ebb0c5c78c194ce715ddcb178bd87fd7aaccbe Parents: 8a22bd8 Author: Gerald Quintana <gerald.quint...@gmail.com> Authored: Sat Jan 17 22:25:21 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 18 10:00:49 2015 +0100 ---------------------------------------------------------------------- .../CassandraAggregationRepository.java | 41 ++-- .../NamedCassandraAggregationRepository.java | 22 +- .../CassandraIdempotentRepository.java | 21 +- .../NamedCassandraIdempotentRepository.java | 21 +- .../camel/utils/cassandra/CassandraUtils.java | 31 ++- .../CassandraAggregationRepositoryTest.java | 4 +- .../cassandra/CassandraAggregationTest.java | 3 +- ...NamedCassandraAggregationRepositoryTest.java | 232 +++++++++++++++++++ .../CassandraIdempotentRepositoryTest.java | 4 +- .../cassandra/CassandraIdempotentTest.java | 3 +- .../NamedCassandraIdempotentRepositoryTest.java | 144 ++++++++++++ .../src/test/resources/AggregationDataSet.cql | 3 +- .../src/test/resources/IdempotentDataSet.cql | 15 +- .../test/resources/NamedAggregationDataSet.cql | 7 + .../test/resources/NamedIdempotentDataSet.cql | 11 + 15 files changed, 491 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java index e5f689f..40fc257 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java @@ -51,7 +51,7 @@ import static org.apache.camel.utils.cassandra.CassandraUtils.generateSelect; * Warning: Cassandra is not the best tool for queuing use cases * See: http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets */ -public abstract class CassandraAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { +public class CassandraAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository { /** * Logger */ @@ -73,9 +73,13 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl */ private String exchangeColumn = "EXCHANGE"; /** + * Values used as primary key prefix + */ + private Object[] prefixPKValues = new Object[0]; + /** * Primary key columns */ - private String[] pkColumns; + private String[] pkColumns = {"KEY"}; /** * Exchange marshaller/unmarshaller */ @@ -125,15 +129,10 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl } /** - * Get fixed primary key values. - */ - protected abstract Object[] getPKValues(); - - /** - * Generate primary key values: fixed + aggregation key. + * Generate primary key values from aggregation key. */ protected Object[] getPKValues(String key) { - return append(getPKValues(), key); + return append(prefixPKValues, key); } /** @@ -239,14 +238,13 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl */ @Override public void confirm(CamelContext camelContext, String exchangeId) { - Object[] pkValues = getPKValues(); String keyColumn = getKeyColumn(); - LOGGER.debug("Selecting Ids {} ", pkValues); + LOGGER.debug("Selecting Ids"); List<Row> rows = selectKeyIds(); for (Row row : rows) { if (row.getString(exchangeIdColumn).equals(exchangeId)) { String key = row.getString(keyColumn); - Object[] cqlParams = append(pkValues, key, exchangeId); + Object[] cqlParams = append(getPKValues(key), exchangeId); LOGGER.debug("Deleting If Id {} ", cqlParams); getSession().execute(deleteIfIdStatement.bind(cqlParams)); } @@ -281,10 +279,9 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl selectKeyIdStatement = applyConsistencyLevel(getSession().prepare(cql), readConsistencyLevel); } - private List<Row> selectKeyIds() { - Object[] pkValues = getPKValues(); - LOGGER.debug("Selecting keys {}", pkValues); - return getSession().execute(selectKeyIdStatement.bind(pkValues)).all(); + protected List<Row> selectKeyIds() { + LOGGER.debug("Selecting keys {}", getPrefixPKValues()); + return getSession().execute(selectKeyIdStatement.bind(getPrefixPKValues())).all(); } /** @@ -294,7 +291,7 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl public Set<String> getKeys() { List<Row> rows = selectKeyIds(); Set<String> keys = new HashSet<String>(rows.size()); - String keyColumnName = getPKColumns()[1]; + String keyColumnName = getKeyColumn(); for (Row row : rows) { keys.add(row.getString(keyColumnName)); } @@ -323,7 +320,7 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl @Override public Exchange recover(CamelContext camelContext, String exchangeId) { List<Row> rows = selectKeyIds(); - String keyColumnName = getPKColumns()[1]; + String keyColumnName = getKeyColumn(); String lKey = null; for (Row row : rows) { String lExchangeId = row.getString(exchangeIdColumn); @@ -356,6 +353,14 @@ public abstract class CassandraAggregationRepository extends ServiceSupport impl this.table = table; } + public Object[] getPrefixPKValues() { + return prefixPKValues; + } + + public void setPrefixPKValues(Object ... prefixPKValues) { + this.prefixPKValues = prefixPKValues; + } + public String[] getPKColumns() { return pkColumns; } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java index 347ed75..f13230f 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor.aggregate.cassandra; + import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; @@ -24,38 +25,29 @@ import com.datastax.driver.core.Session; * columns as primary key: name (partition key) and key (clustering key). */ public class NamedCassandraAggregationRepository extends CassandraAggregationRepository { - /** - * Aggregation repository name - */ - private String name; - public NamedCassandraAggregationRepository() { setPKColumns("NAME", "KEY"); + setName("DEFAULT"); } public NamedCassandraAggregationRepository(Session session, String name) { super(session); - this.name = name; setPKColumns("NAME", "KEY"); + setName(name); } public NamedCassandraAggregationRepository(Cluster cluster, String keyspace, String name) { super(cluster, keyspace); - this.name = name; setPKColumns("NAME", "KEY"); - } - - @Override - protected Object[] getPKValues() { - return new Object[]{name}; + setName(name); } public String getName() { - return name; + return (String) getPrefixPKValues()[0]; } - public void setName(String name) { - this.name = name; + public final void setName(String name) { + setPrefixPKValues(name); } } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java index ddb7168..7b3699d 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java @@ -28,6 +28,7 @@ import org.apache.camel.utils.cassandra.CassandraSessionHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.utils.cassandra.CassandraUtils.append; import static org.apache.camel.utils.cassandra.CassandraUtils.applyConsistencyLevel; import static org.apache.camel.utils.cassandra.CassandraUtils.generateDelete; import static org.apache.camel.utils.cassandra.CassandraUtils.generateInsert; @@ -42,7 +43,7 @@ import static org.apache.camel.utils.cassandra.CassandraUtils.generateSelect; * * @param <K> Message Id */ -public abstract class CassandraIdempotentRepository<K> extends ServiceSupport implements IdempotentRepository<K> { +public class CassandraIdempotentRepository<K> extends ServiceSupport implements IdempotentRepository<K> { /** * Logger */ @@ -56,9 +57,13 @@ public abstract class CassandraIdempotentRepository<K> extends ServiceSupport im */ private String table = "CAMEL_IDEMPOTENT"; /** + * Values used as primary key prefix + */ + private Object[] prefixPKValues = new Object[0]; + /** * Primary key columns */ - private String[] pkColumns; + private String[] pkColumns = {"KEY"}; /** * Time to live in seconds used for inserts */ @@ -97,7 +102,9 @@ public abstract class CassandraIdempotentRepository<K> extends ServiceSupport im } } - protected abstract Object[] getPKValues(K key); + protected Object[] getPKValues(K key) { + return append(prefixPKValues, key); + } // ------------------------------------------------------------------------- // Lifecycle methods @@ -217,4 +224,12 @@ public abstract class CassandraIdempotentRepository<K> extends ServiceSupport im this.readConsistencyLevel = readConsistencyLevel; } + public Object[] getPrefixPKValues() { + return prefixPKValues; + } + + public void setPrefixPKValues(Object[] prefixPKValues) { + this.prefixPKValues = prefixPKValues; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java index 73219fc..6a10212 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java @@ -24,38 +24,29 @@ import com.datastax.driver.core.Session; * columns as primary key: name (partition key) and key (clustering key). */ public class NamedCassandraIdempotentRepository<K> extends CassandraIdempotentRepository<K> { - /** - * Idempotent repository name - */ - private String name; - public NamedCassandraIdempotentRepository() { setPKColumns("NAME", "KEY"); + setName("DEFAULT"); } public NamedCassandraIdempotentRepository(Session session, String name) { super(session); - this.name = name; setPKColumns("NAME", "KEY"); + setName(name); } public NamedCassandraIdempotentRepository(Cluster cluster, String keyspace, String name) { super(cluster, keyspace); - this.name = name; setPKColumns("NAME", "KEY"); - } - - @Override - protected Object[] getPKValues(K key) { - return new Object[]{name, key}; + setName(name); } public String getName() { - return name; + return (String) getPrefixPKValues()[0]; } - public void setName(String name) { - this.name = name; + public final void setName(String name) { + setPrefixPKValues(new String[]{name}); } } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java index defc169..6a0722d 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java @@ -32,21 +32,42 @@ public class CassandraUtils { } return statement; } - + private static boolean isEmpty(Object[] array) { + return array == null || array.length == 0; + } /** * Concatenate 2 arrays. */ public static Object[] concat(Object[] array1, Object[] array2) { + if (isEmpty(array1)) { + return array2; + } + if (isEmpty(array2)) { + return array1; + } Object[] array = new Object[array1.length + array2.length]; System.arraycopy(array1, 0, array, 0, array1.length); System.arraycopy(array2, 0, array, array1.length, array2.length); return array; } + private static int size(String[] array) { + return array == null ? 0 : array.length; + } + + private static boolean isEmpty(String[] array) { + return size(array) == 0; + } /** * Concatenate 2 arrays. */ public static String[] concat(String[] array1, String[] array2) { + if (isEmpty(array1)) { + return array2; + } + if (isEmpty(array2)) { + return array1; + } String[] array = new String[array1.length + array2.length]; System.arraycopy(array1, 0, array, 0, array1.length); System.arraycopy(array2, 0, array, array1.length, array2.length); @@ -90,6 +111,9 @@ public class CassandraUtils { * Append where columns = ? to CQL. */ public static void appendWhere(StringBuilder cqlBuilder, String[] columns, int maxColumnIndex) { + if (isEmpty(columns) || maxColumnIndex<= 0) { + return; + } cqlBuilder.append(" where "); appendColumns(cqlBuilder, columns, "=? and ", maxColumnIndex); cqlBuilder.append("=?"); @@ -132,12 +156,11 @@ public class CassandraUtils { } return cqlBuilder; } - /** * Generate select where columns = ? CQL. */ public static StringBuilder generateSelect(String table, String[] selectColumns, String[] whereColumns) { - return generateSelect(table, selectColumns, whereColumns, whereColumns.length); + return generateSelect(table, selectColumns, whereColumns, size(whereColumns)); } /** @@ -155,7 +178,7 @@ public class CassandraUtils { * Generate delete where columns = ? CQL. */ public static StringBuilder generateDelete(String table, String[] whereColumns, boolean ifExists) { - return generateDelete(table, whereColumns, whereColumns.length, ifExists); + return generateDelete(table, whereColumns, size(whereColumns), ifExists); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java index 15714ac..b0c7195 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryTest.java @@ -61,7 +61,7 @@ public class CassandraAggregationRepositoryTest { camelContext = new DefaultCamelContext(); cluster = CassandraUnitUtils.cassandraCluster(); session = cluster.connect(CassandraUnitUtils.KEYSPACE); - aggregationRepository = new NamedCassandraAggregationRepository(session, "ID"); + aggregationRepository = new CassandraAggregationRepository(session); aggregationRepository.start(); } @@ -79,7 +79,7 @@ public class CassandraAggregationRepositoryTest { private boolean exists(String key) { return session.execute( - "select KEY from CAMEL_AGGREGATION where NAME=? and KEY=?", "ID", key) + "select KEY from CAMEL_AGGREGATION where KEY=?", key) .one() != null; } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java index 0302663..d3f9208 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationTest.java @@ -38,9 +38,10 @@ public class CassandraAggregationTest extends CamelTestSupport { CassandraUnitUtils.startEmbeddedCassandra(); cluster = CassandraUnitUtils.cassandraCluster(); Session rootSession = cluster.connect(); - CassandraUnitUtils.loadCQLDataSet(rootSession, "AggregationDataSet.cql"); + CassandraUnitUtils.loadCQLDataSet(rootSession, "NamedAggregationDataSet.cql"); rootSession.close(); aggregationRepository = new NamedCassandraAggregationRepository(cluster, CassandraUnitUtils.KEYSPACE, "ID"); + aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION"); aggregationRepository.start(); super.doPreSetup(); } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java new file mode 100644 index 0000000..a4c9a76 --- /dev/null +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryTest.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.cassandra; + +import java.util.Set; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.component.cassandra.CassandraUnitUtils; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultExchange; +import org.cassandraunit.CassandraCQLUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Unite test for {@link CassandraAggregationRepository} + */ +public class NamedCassandraAggregationRepositoryTest { + @Rule + public CassandraCQLUnit cassandraRule = CassandraUnitUtils.cassandraCQLUnit("NamedAggregationDataSet.cql"); + + private Cluster cluster; + private Session session; + private CassandraAggregationRepository aggregationRepository; + private CamelContext camelContext; + + @BeforeClass + public static void setUpClass() throws Exception { + CassandraUnitUtils.startEmbeddedCassandra(); + } + + @Before + public void setUp() throws Exception { + camelContext = new DefaultCamelContext(); + cluster = CassandraUnitUtils.cassandraCluster(); + session = cluster.connect(CassandraUnitUtils.KEYSPACE); + aggregationRepository = new NamedCassandraAggregationRepository(session, "ID"); + aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION"); + aggregationRepository.start(); + } + + @After + public void tearDown() throws Exception { + aggregationRepository.stop(); + session.close(); + cluster.close(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + CassandraUnitUtils.cleanEmbeddedCassandra(); + } + + private boolean exists(String key) { + return session.execute( + "select KEY from NAMED_CAMEL_AGGREGATION where NAME=? and KEY=?", "ID", key) + .one() != null; + } + + @Test + public void testAdd() { + // Given + String key = "Add"; + assertFalse(exists(key)); + Exchange exchange = new DefaultExchange(camelContext); + // When + aggregationRepository.add(camelContext, key, exchange); + // Then + assertTrue(exists(key)); + } + + @Test + public void testGetExists() { + // Given + String key = "Get_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNotNull(exchange2); + assertEquals(exchange.getExchangeId(), exchange2.getExchangeId()); + } + + @Test + public void testGetNotExists() { + // Given + String key = "Get_NotExists"; + assertFalse(exists(key)); + // When + Exchange exchange2 = aggregationRepository.get(camelContext, key); + // Then + assertNull(exchange2); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + Exchange exchange = new DefaultExchange(camelContext); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "RemoveNotExists"; + Exchange exchange = new DefaultExchange(camelContext); + assertFalse(exists(key)); + // When + aggregationRepository.remove(camelContext, key, exchange); + // Then + assertFalse(exists(key)); + } + + @Test + public void testGetKeys() { + // Given + String[] keys = {"GetKeys1", "GetKeys2"}; + addExchanges(keys); + // When + Set<String> keySet = aggregationRepository.getKeys(); + // Then + for (String key : keys) { + assertTrue(keySet.contains(key)); + } + } + + @Test + public void testConfirmExist() { + // Given + for (int i = 1; i < 4; i++) { + String key = "Confirm_" + i; + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange_" + i); + aggregationRepository.add(camelContext, key, exchange); + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Exchange_2"); + // Then + assertTrue(exists("Confirm_1")); + assertFalse(exists("Confirm_2")); + assertTrue(exists("Confirm_3")); + } + + @Test + public void testConfirmNotExist() { + // Given + String[] keys=new String[3]; + for (int i = 1; i < 4; i++) { + keys[i-1] = "Confirm" + i; + } + addExchanges(keys); + for(String key:keys) { + assertTrue(exists(key)); + } + // When + aggregationRepository.confirm(camelContext, "Exchange-Confirm5"); + // Then + for (String key: keys) { + assertTrue(exists(key)); + } + } + private void addExchanges(String ... keys) { + for (String key : keys) { + Exchange exchange = new DefaultExchange(camelContext); + exchange.setExchangeId("Exchange-"+key); + aggregationRepository.add(camelContext, key, exchange); + } + } + + @Test + public void testScan() { + // Given + String[] keys = {"Scan1", "Scan2"}; + addExchanges(keys); + // When + Set<String> exchangeIdSet = aggregationRepository.scan(camelContext); + // Then + for (String key : keys) { + assertTrue(exchangeIdSet.contains("Exchange-"+key)); + } + } + + @Test + public void testRecover() { + // Given + String[] keys = {"Recover1", "Recover2"}; + addExchanges(keys); + // When + Exchange exchange2 = aggregationRepository.recover(camelContext, "Exchange-Recover2"); + Exchange exchange3 = aggregationRepository.recover(camelContext, "Exchange-Recover3"); + // Then + assertNotNull(exchange2); + assertNull(exchange3); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java index 89a73b4..2a14380 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryTest.java @@ -50,7 +50,7 @@ public class CassandraIdempotentRepositoryTest { public void setUp() throws Exception { cluster = CassandraUnitUtils.cassandraCluster(); session = cluster.connect(CassandraUnitUtils.KEYSPACE); - idempotentRepository = new NamedCassandraIdempotentRepository<String>(session, "ID"); + idempotentRepository = new CassandraIdempotentRepository<String>(session); idempotentRepository.start(); } @@ -68,7 +68,7 @@ public class CassandraIdempotentRepositoryTest { private boolean exists(String key) { return session.execute( - "select KEY from CAMEL_IDEMPOTENT where NAME=? and KEY=?", "ID", key) + "select KEY from CAMEL_IDEMPOTENT where KEY=?", key) .one() != null; } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java index 1380bf9..e8e7338 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentTest.java @@ -36,9 +36,10 @@ public class CassandraIdempotentTest extends CamelTestSupport { CassandraUnitUtils.startEmbeddedCassandra(); cluster = CassandraUnitUtils.cassandraCluster(); Session rootSession = cluster.connect(); - CassandraUnitUtils.loadCQLDataSet(rootSession, "IdempotentDataSet.cql"); + CassandraUnitUtils.loadCQLDataSet(rootSession, "NamedIdempotentDataSet.cql"); rootSession.close(); idempotentRepository = new NamedCassandraIdempotentRepository(cluster, CassandraUnitUtils.KEYSPACE, "ID"); + idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT"); idempotentRepository.start(); super.doPreSetup(); } http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java new file mode 100644 index 0000000..bdb577f --- /dev/null +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.camel.component.cassandra.CassandraUnitUtils; +import org.cassandraunit.CassandraCQLUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for {@link CassandraIdempotentRepository} + */ +public class NamedCassandraIdempotentRepositoryTest { + @Rule + public CassandraCQLUnit cassandraRule = CassandraUnitUtils.cassandraCQLUnit("NamedIdempotentDataSet.cql"); + + private Cluster cluster; + private Session session; + private CassandraIdempotentRepository<String> idempotentRepository; + + @BeforeClass + public static void setUpClass() throws Exception { + CassandraUnitUtils.startEmbeddedCassandra(); + } + + @Before + public void setUp() throws Exception { + cluster = CassandraUnitUtils.cassandraCluster(); + session = cluster.connect(CassandraUnitUtils.KEYSPACE); + idempotentRepository = new NamedCassandraIdempotentRepository<String>(session, "ID"); + idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT"); + idempotentRepository.start(); + } + + @After + public void tearDown() throws Exception { + idempotentRepository.stop(); + session.close(); + cluster.close(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + CassandraUnitUtils.cleanEmbeddedCassandra(); + } + + private boolean exists(String key) { + return session.execute( + "select KEY from NAMED_CAMEL_IDEMPOTENT where NAME=? and KEY=?", "ID", key) + .one() != null; + } + + @Test + public void testAddNotExists() { + // Given + String key = "Add_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertTrue(result); + assertTrue(exists(key)); + } + + @Test + public void testAddExists() { + // Given + String key = "Add_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.add(key); + // Then + assertFalse(result); + assertTrue(exists(key)); + } + + @Test + public void testContainsNotExists() { + // Given + String key = "Contains_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertFalse(result); + } + + @Test + public void testContainsExists() { + // Given + String key = "Contains_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + assertTrue(result); + } + + @Test + public void testRemoveNotExists() { + // Given + String key = "Remove_NotExists"; + assertFalse(exists(key)); + // When + boolean result = idempotentRepository.contains(key); + // Then + // assertFalse(result); + } + + @Test + public void testRemoveExists() { + // Given + String key = "Remove_Exists"; + assertTrue(exists(key)); + // When + boolean result = idempotentRepository.remove(key); + // Then + assertTrue(result); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql b/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql index aa6699a..0ff543a 100644 --- a/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql +++ b/components/camel-cassandraql/src/test/resources/AggregationDataSet.cql @@ -1,7 +1,6 @@ CREATE TABLE CAMEL_AGGREGATION ( - NAME varchar, KEY varchar, EXCHANGE_ID varchar, EXCHANGE blob, - PRIMARY KEY (NAME, KEY) + PRIMARY KEY (KEY) ); http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql b/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql index 430e920..50a09a7 100644 --- a/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql +++ b/components/camel-cassandraql/src/test/resources/IdempotentDataSet.cql @@ -1,11 +1,10 @@ CREATE TABLE CAMEL_IDEMPOTENT ( - NAME varchar, KEY varchar, - PRIMARY KEY (NAME, KEY) + PRIMARY KEY (KEY) ); -INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY) - VALUES('ID','Add_Exists'); -INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY) - VALUES('ID','Contains_Exists'); -INSERT INTO CAMEL_IDEMPOTENT(NAME, KEY) - VALUES('ID','Remove_Exists'); +INSERT INTO CAMEL_IDEMPOTENT(KEY) + VALUES('Add_Exists'); +INSERT INTO CAMEL_IDEMPOTENT(KEY) + VALUES('Contains_Exists'); +INSERT INTO CAMEL_IDEMPOTENT(KEY) + VALUES('Remove_Exists'); http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql b/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql new file mode 100644 index 0000000..2b53326 --- /dev/null +++ b/components/camel-cassandraql/src/test/resources/NamedAggregationDataSet.cql @@ -0,0 +1,7 @@ +CREATE TABLE NAMED_CAMEL_AGGREGATION ( + NAME varchar, + KEY varchar, + EXCHANGE_ID varchar, + EXCHANGE blob, + PRIMARY KEY (NAME, KEY) +); http://git-wip-us.apache.org/repos/asf/camel/blob/94ebb0c5/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql b/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql new file mode 100644 index 0000000..14a8e52 --- /dev/null +++ b/components/camel-cassandraql/src/test/resources/NamedIdempotentDataSet.cql @@ -0,0 +1,11 @@ +CREATE TABLE NAMED_CAMEL_IDEMPOTENT ( + NAME varchar, + KEY varchar, + PRIMARY KEY (NAME, KEY) +); +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Add_Exists'); +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Contains_Exists'); +INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY) + VALUES('ID','Remove_Exists');