CAMEL-2939: Support for RegularStatement in messages processed by Producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81f4b94d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81f4b94d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81f4b94d Branch: refs/heads/master Commit: 81f4b94d2bf21b1e27306f4c97dab7ab9456407c Parents: d4419bc Author: Gerald Quintana <gerald.quint...@gmail.com> Authored: Sun Jan 18 21:29:41 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 18 10:00:49 2015 +0100 ---------------------------------------------------------------------- .../component/cassandra/CassandraProducer.java | 43 ++++++++++++++------ .../camel/utils/cassandra/CassandraUtils.java | 5 ++- .../CassandraComponentProducerTest.java | 25 ++++++++++++ ...assandraComponentProducerUnpreparedTest.java | 30 ++++++++++++++ 4 files changed, 90 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java index 4d1d39b..394283c 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.cassandra; import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import org.apache.camel.Exchange; @@ -26,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import static org.apache.camel.utils.cassandra.CassandraUtils.isEmpty; /** * Cassandra 2 CQL3 producer. @@ -87,7 +89,11 @@ public class CassandraProducer extends DefaultProducer { * Execute CQL query using incoming message body has statement parameters. */ private ResultSet execute(Message message) { - String messageCql = message.getHeader(CassandraConstants.CQL_QUERY, String.class); + Object messageCql = message.getHeader(CassandraConstants.CQL_QUERY); + // Convert Empty string to null + if (messageCql instanceof String && ((String) messageCql).isEmpty()) { + messageCql = null; + } Object[] cqlParams = getCqlParams(message); ResultSet resultSet; @@ -103,17 +109,22 @@ public class CassandraProducer extends DefaultProducer { /** * Execute CQL as PreparedStatement */ - private ResultSet executePreparedStatement(Session session, String messageCql, Object[] cqlParams) { + private ResultSet executePreparedStatement(Session session, Object messageCql, Object[] cqlParams) { ResultSet resultSet; PreparedStatement lPreparedStatement; - if (messageCql == null || messageCql.isEmpty()) { + if (messageCql == null) { // URI CQL lPreparedStatement = this.preparedStatement; - } else { + } else if (messageCql instanceof String) { // Message CQL - lPreparedStatement = getEndpoint().prepareStatement(messageCql); + lPreparedStatement = getEndpoint().prepareStatement((String) messageCql); + } else if (messageCql instanceof RegularStatement) { + // Message Statement + lPreparedStatement = getEndpoint().getSession().prepare((RegularStatement) messageCql); + } else { + throw new IllegalArgumentException("Invalid "+CassandraConstants.CQL_QUERY+" header"); } - if (cqlParams == null || cqlParams.length == 0) { + if (isEmpty(cqlParams)) { resultSet = session.execute(lPreparedStatement.bind()); } else { resultSet = session.execute(lPreparedStatement.bind(cqlParams)); @@ -124,17 +135,25 @@ public class CassandraProducer extends DefaultProducer { /** * Execute CQL as is */ - private ResultSet executeStatement(Session session, String messageCql, Object[] cqlParams) { + private ResultSet executeStatement(Session session, Object messageCql, Object[] cqlParams) { ResultSet resultSet; - String cql; - if (messageCql == null || messageCql.isEmpty()) { + String cql = null; + RegularStatement statement = null; + if (messageCql == null) { // URI CQL cql = getEndpoint().getCql(); - } else { + } else if (messageCql instanceof String) { // Message CQL - cql = messageCql; + cql = (String) messageCql; + } else if (messageCql instanceof RegularStatement) { + // Message Statement + statement = (RegularStatement) messageCql; + } else { + throw new IllegalArgumentException("Invalid "+CassandraConstants.CQL_QUERY+" header"); } - if (cqlParams == null || cqlParams.length == 0) { + if (statement != null) { + resultSet = session.execute(statement); + } else if (isEmpty(cqlParams)) { resultSet = session.execute(cql); } else { resultSet = session.execute(cql, cqlParams); http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java index b46db34..4300d14 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java @@ -27,7 +27,10 @@ import com.datastax.driver.core.querybuilder.Select; * */ public class CassandraUtils { - private static boolean isEmpty(Object[] array) { + /** + * Test if the array is null or empty. + */ + public static boolean isEmpty(Object[] array) { return array == null || array.length == 0; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java index 5780b5b..4196842 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.cassandra; import com.datastax.driver.core.*; +import static com.datastax.driver.core.querybuilder.QueryBuilder.*; +import com.datastax.driver.core.querybuilder.Update; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; @@ -123,6 +125,29 @@ public class CassandraComponentProducerTest extends CamelTestSupport { cluster.close(); } + /** + * Test with incoming message containing a header with RegularStatement. + */ + @Test + public void testRequestMessageStatement() throws Exception { + Update.Where update = update("camel_user") + .with(set("first_name", bindMarker())) + .and(set("last_name", bindMarker())) + .where(eq("login", bindMarker())); + Object response = producerTemplate.requestBodyAndHeader(new Object[]{"Claus 2", "Ibsen 2", "c_ibsen"}, + CassandraConstants.CQL_QUERY, update); + + Cluster cluster = CassandraUnitUtils.cassandraCluster(); + Session session = cluster.connect(CassandraUnitUtils.KEYSPACE); + ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + session.close(); + cluster.close(); + } + @Test public void testRequestNotConsistent() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java index f8702d1..8c07635 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java @@ -17,6 +17,11 @@ package org.apache.camel.component.cassandra; import com.datastax.driver.core.*; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.set; +import static com.datastax.driver.core.querybuilder.QueryBuilder.update; +import com.datastax.driver.core.querybuilder.Update; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; @@ -29,6 +34,8 @@ import org.junit.Test; import java.util.Arrays; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class CassandraComponentProducerUnpreparedTest extends CamelTestSupport { @@ -115,5 +122,28 @@ public class CassandraComponentProducerUnpreparedTest extends CamelTestSupport { session.close(); cluster.close(); } + + /** + * Test with incoming message containing a header with RegularStatement. + */ + @Test + public void testRequestMessageStatement() throws Exception { + Update.Where update = update("camel_user") + .with(set("first_name", "Claus 2")) + .and(set("last_name", "Ibsen 2")) + .where(eq("login", "c_ibsen")); + Object response = producerTemplate.requestBodyAndHeader(null, + CassandraConstants.CQL_QUERY, update); + + Cluster cluster = CassandraUnitUtils.cassandraCluster(); + Session session = cluster.connect(CassandraUnitUtils.KEYSPACE); + ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + session.close(); + cluster.close(); + } }