CAMEL-2939: Support for RegularStatement in messages processed by Producer

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81f4b94d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81f4b94d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81f4b94d

Branch: refs/heads/master
Commit: 81f4b94d2bf21b1e27306f4c97dab7ab9456407c
Parents: d4419bc
Author: Gerald Quintana <gerald.quint...@gmail.com>
Authored: Sun Jan 18 21:29:41 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 18 10:00:49 2015 +0100

----------------------------------------------------------------------
 .../component/cassandra/CassandraProducer.java  | 43 ++++++++++++++------
 .../camel/utils/cassandra/CassandraUtils.java   |  5 ++-
 .../CassandraComponentProducerTest.java         | 25 ++++++++++++
 ...assandraComponentProducerUnpreparedTest.java | 30 ++++++++++++++
 4 files changed, 90 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
index 4d1d39b..394283c 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.cassandra;
 
 import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.RegularStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import org.apache.camel.Exchange;
@@ -26,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import static org.apache.camel.utils.cassandra.CassandraUtils.isEmpty;
 
 /**
  * Cassandra 2 CQL3 producer.
@@ -87,7 +89,11 @@ public class CassandraProducer extends DefaultProducer {
      * Execute CQL query using incoming message body has statement parameters.
      */
     private ResultSet execute(Message message) {
-        String messageCql = message.getHeader(CassandraConstants.CQL_QUERY, 
String.class);
+        Object messageCql = message.getHeader(CassandraConstants.CQL_QUERY);
+        // Convert Empty string to null
+        if (messageCql instanceof String && ((String) messageCql).isEmpty()) {
+            messageCql = null;
+        }
         Object[] cqlParams = getCqlParams(message);
 
         ResultSet resultSet;
@@ -103,17 +109,22 @@ public class CassandraProducer extends DefaultProducer {
     /**
      * Execute CQL as PreparedStatement
      */
-    private ResultSet executePreparedStatement(Session session, String 
messageCql, Object[] cqlParams) {
+    private ResultSet executePreparedStatement(Session session, Object 
messageCql, Object[] cqlParams) {
         ResultSet resultSet;
         PreparedStatement lPreparedStatement;
-        if (messageCql == null || messageCql.isEmpty()) {
+        if (messageCql == null) {
             // URI CQL
             lPreparedStatement = this.preparedStatement;
-        } else {
+        } else if (messageCql instanceof String) {
             // Message CQL
-            lPreparedStatement = getEndpoint().prepareStatement(messageCql);
+            lPreparedStatement = getEndpoint().prepareStatement((String) 
messageCql);
+        } else if (messageCql instanceof RegularStatement) {
+            // Message Statement
+            lPreparedStatement = 
getEndpoint().getSession().prepare((RegularStatement) messageCql);
+        } else {
+            throw new IllegalArgumentException("Invalid 
"+CassandraConstants.CQL_QUERY+" header");
         }
-        if (cqlParams == null || cqlParams.length == 0) {
+        if (isEmpty(cqlParams)) {
             resultSet = session.execute(lPreparedStatement.bind());
         } else {
             resultSet = session.execute(lPreparedStatement.bind(cqlParams));
@@ -124,17 +135,25 @@ public class CassandraProducer extends DefaultProducer {
     /**
      * Execute CQL as is
      */
-    private ResultSet executeStatement(Session session, String messageCql, 
Object[] cqlParams) {
+    private ResultSet executeStatement(Session session, Object messageCql, 
Object[] cqlParams) {
         ResultSet resultSet;
-        String cql;
-        if (messageCql == null || messageCql.isEmpty()) {
+        String cql = null;
+        RegularStatement statement = null;
+        if (messageCql == null) {
             // URI CQL
             cql = getEndpoint().getCql();
-        } else {
+        } else if (messageCql instanceof String) {
             // Message CQL
-            cql = messageCql;
+            cql = (String) messageCql;
+        } else if (messageCql instanceof RegularStatement) {
+            // Message Statement
+            statement = (RegularStatement) messageCql;
+        } else {
+            throw new IllegalArgumentException("Invalid 
"+CassandraConstants.CQL_QUERY+" header");
         }
-        if (cqlParams == null || cqlParams.length == 0) {
+        if (statement != null) {
+            resultSet = session.execute(statement);
+        } else if (isEmpty(cqlParams)) {
             resultSet = session.execute(cql);
         } else {
             resultSet = session.execute(cql, cqlParams);

http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
index b46db34..4300d14 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
@@ -27,7 +27,10 @@ import com.datastax.driver.core.querybuilder.Select;
  *
  */
 public class CassandraUtils {
-    private static boolean isEmpty(Object[] array) {
+    /**
+     * Test if the array is null or empty.
+     */
+    public static boolean isEmpty(Object[] array) {
         return array == null || array.length == 0;
     }
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
index 5780b5b..4196842 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerTest.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.cassandra;
 
 import com.datastax.driver.core.*;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+import com.datastax.driver.core.querybuilder.Update;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -123,6 +125,29 @@ public class CassandraComponentProducerTest extends 
CamelTestSupport {
         cluster.close();
     }
 
+    /**
+     * Test with incoming message containing a header with RegularStatement.
+     */
+    @Test
+    public void testRequestMessageStatement() throws Exception {
+        Update.Where update = update("camel_user")
+                .with(set("first_name", bindMarker()))
+                .and(set("last_name", bindMarker()))
+                .where(eq("login", bindMarker()));
+        Object response = producerTemplate.requestBodyAndHeader(new 
Object[]{"Claus 2", "Ibsen 2", "c_ibsen"},
+                CassandraConstants.CQL_QUERY, update);
+
+        Cluster cluster = CassandraUnitUtils.cassandraCluster();
+        Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        ResultSet resultSet = session.execute("select login, first_name, 
last_name from camel_user where login = ?", "c_ibsen");
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+        session.close();
+        cluster.close();
+    }
+
     @Test
     public void testRequestNotConsistent() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/81f4b94d/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
index f8702d1..8c07635 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
@@ -17,6 +17,11 @@
 package org.apache.camel.component.cassandra;
 
 import com.datastax.driver.core.*;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import com.datastax.driver.core.querybuilder.Update;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -29,6 +34,8 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class CassandraComponentProducerUnpreparedTest extends CamelTestSupport 
{
 
@@ -115,5 +122,28 @@ public class CassandraComponentProducerUnpreparedTest 
extends CamelTestSupport {
         session.close();
         cluster.close();
     }
+    
+    /**
+     * Test with incoming message containing a header with RegularStatement.
+     */
+    @Test
+    public void testRequestMessageStatement() throws Exception {
+        Update.Where update = update("camel_user")
+                .with(set("first_name", "Claus 2"))
+                .and(set("last_name", "Ibsen 2"))
+                .where(eq("login", "c_ibsen"));
+        Object response = producerTemplate.requestBodyAndHeader(null,
+                CassandraConstants.CQL_QUERY, update);
+
+        Cluster cluster = CassandraUnitUtils.cassandraCluster();
+        Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        ResultSet resultSet = session.execute("select login, first_name, 
last_name from camel_user where login = ?", "c_ibsen");
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+        session.close();
+        cluster.close();
+    }
 
 }

Reply via email to