Repository: camel
Updated Branches:
  refs/heads/master 69934ac79 -> db5e6be5a


CAMEL-2939: Allow to use unprepared statements


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/742eda6b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/742eda6b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/742eda6b

Branch: refs/heads/master
Commit: 742eda6b021f67a63fd7bdae83d9c3d4fda7faa5
Parents: 0592f0c
Author: Gerald Quintana <gerald.quint...@zenika.com>
Authored: Wed Jan 14 17:54:08 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Feb 18 10:00:48 2015 +0100

----------------------------------------------------------------------
 .../component/cassandra/CassandraConsumer.java  |  21 +++-
 .../component/cassandra/CassandraEndpoint.java  |  13 ++
 .../component/cassandra/CassandraProducer.java  |  59 ++++++++-
 .../CassandraComponentConsumerTest.java         |  17 +++
 ...assandraComponentProducerUnpreparedTest.java | 119 +++++++++++++++++++
 5 files changed, 220 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 168a7c2..fdad0ae 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -47,7 +47,12 @@ public class CassandraConsumer extends ScheduledPollConsumer 
{
     protected int poll() throws Exception {
         // Execute CQL Query
         Session session = getEndpoint().getSessionHolder().getSession();
-        ResultSet resultSet = session.execute(preparedStatement.bind());
+        ResultSet resultSet;
+        if (isPrepareStatements()) {
+            resultSet = session.execute(preparedStatement.bind());
+        } else {
+            resultSet = session.execute(getEndpoint().getCql());
+        }
 
         // Create message from ResultSet
         Exchange exchange = getEndpoint().createExchange();
@@ -69,9 +74,19 @@ public class CassandraConsumer extends ScheduledPollConsumer 
{
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
-        if (preparedStatement == null) {
+        if (isPrepareStatements()) {
             preparedStatement = getEndpoint().prepareStatement();
         }
     }
+
+    @Override
+    protected void doStop() throws Exception {
+        this.preparedStatement = null;
+        super.doStop();
+    }
+
+    public boolean isPrepareStatements() {
+        return getEndpoint().isPrepareStatements();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
index 5d6ad1f..8d70b9f 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
@@ -51,6 +51,11 @@ public class CassandraEndpoint extends DefaultEndpoint {
     private String keyspace;
     @UriParam
     private String cql;
+    /**
+     * Use PreparedStatements or normal Statements
+     */
+    @UriParam
+    private boolean prepareStatements=true;
     @UriParam
     private String clusterName;
     @UriParam
@@ -282,4 +287,12 @@ public class CassandraEndpoint extends DefaultEndpoint {
     public void setResultSetConversionStrategy(ResultSetConversionStrategy 
resultSetConversionStrategy) {
         this.resultSetConversionStrategy = resultSetConversionStrategy;
     }
+
+    public boolean isPrepareStatements() {
+        return prepareStatements;
+    }
+
+    public void setPrepareStatements(boolean prepareStatements) {
+        this.prepareStatements = prepareStatements;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
index 824d0eb..4d1d39b 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
@@ -46,10 +46,26 @@ public class CassandraProducer extends DefaultProducer {
     }
 
     @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (isPrepareStatements()) {
+            this.preparedStatement = getEndpoint().prepareStatement();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        this.preparedStatement = null;
+        super.doStop();
+    }
+
+    @Override
     public CassandraEndpoint getEndpoint() {
         return (CassandraEndpoint) super.getEndpoint();
     }
-
+    public boolean isPrepareStatements() {
+        return getEndpoint().isPrepareStatements();
+    }
     private Object[] getCqlParams(Message message) {
         Object cqlParamsObj = message.getBody();
         Object[] cqlParams;
@@ -75,19 +91,29 @@ public class CassandraProducer extends DefaultProducer {
         Object[] cqlParams = getCqlParams(message);
 
         ResultSet resultSet;
+        Session session = getEndpoint().getSessionHolder().getSession();
+        if (isPrepareStatements()) {
+            resultSet = executePreparedStatement(session, messageCql, 
cqlParams);
+        } else {
+            resultSet = executeStatement(session, messageCql, cqlParams);
+        }
+        return resultSet;
+    }
+
+    /**
+     * Execute CQL as PreparedStatement
+     */
+    private ResultSet executePreparedStatement(Session session, String 
messageCql, Object[] cqlParams) {
+        ResultSet resultSet;
         PreparedStatement lPreparedStatement;
         if (messageCql == null || messageCql.isEmpty()) {
             // URI CQL
-            if (preparedStatement == null) {
-                this.preparedStatement = getEndpoint().prepareStatement();
-            }
             lPreparedStatement = this.preparedStatement;
         } else {
             // Message CQL
             lPreparedStatement = getEndpoint().prepareStatement(messageCql);
         }
-        Session session = getEndpoint().getSessionHolder().getSession();
-        if (cqlParams == null || cqlParams.length==0) {
+        if (cqlParams == null || cqlParams.length == 0) {
             resultSet = session.execute(lPreparedStatement.bind());
         } else {
             resultSet = session.execute(lPreparedStatement.bind(cqlParams));
@@ -95,6 +121,27 @@ public class CassandraProducer extends DefaultProducer {
         return resultSet;
     }
 
+    /**
+     * Execute CQL as is
+     */
+    private ResultSet executeStatement(Session session, String messageCql, 
Object[] cqlParams) {
+        ResultSet resultSet;
+        String cql;
+        if (messageCql == null || messageCql.isEmpty()) {
+            // URI CQL
+            cql = getEndpoint().getCql();
+        } else {
+            // Message CQL
+            cql = messageCql;
+        }
+        if (cqlParams == null || cqlParams.length == 0) {
+            resultSet = session.execute(cql);
+        } else {
+            resultSet = session.execute(cql, cqlParams);
+        }
+        return resultSet;
+    }
+
     public void process(Exchange exchange) throws Exception {
         ResultSet resultSet = execute(exchange.getIn());
         getEndpoint().fillMessage(resultSet, exchange.getOut());

http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
index 76d303d..788844e 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
@@ -66,6 +66,21 @@ public class CassandraComponentConsumerTest extends 
CamelTestSupport {
     }
 
     @Test
+    public void testConsumeUnprepared() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultUnprepared");
+        mock.expectedMinimumMessageCount(1);
+        mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof List);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
     public void testConsumeOne() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:resultOne");
         mock.expectedMinimumMessageCount(1);
@@ -87,6 +102,8 @@ public class CassandraComponentConsumerTest extends 
CamelTestSupport {
             public void configure() {
                 from("cql://localhost/camel_ks?cql=" + CQL)
                         .to("mock:resultAll");
+                from("cql://localhost/camel_ks?cql=" + CQL + 
"&prepareStatements=false")
+                        .to("mock:resultUnprepared");
                 from("cql://localhost/camel_ks?cql=" + CQL + 
"&resultSetConversionStrategy=ONE")
                         .to("mock:resultOne");
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
new file mode 100644
index 0000000..f8702d1
--- /dev/null
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.cassandraunit.CassandraCQLUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CassandraComponentProducerUnpreparedTest extends CamelTestSupport 
{
+
+    private static final String CQL = "insert into camel_user(login, 
first_name, last_name) values (?, ?, ?)";
+    private static final String NO_PARAMETER_CQL = "select login, first_name, 
last_name from camel_user";
+
+    @Rule
+    public CassandraCQLUnit cassandra = CassandraUnitUtils.cassandraCQLUnit();
+
+    @Produce(uri = "direct:input")
+    ProducerTemplate producerTemplate;
+
+    @Produce(uri = "direct:inputNoParameter")
+    ProducerTemplate noParameterProducerTemplate;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        CassandraUnitUtils.startEmbeddedCassandra();
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        CassandraUnitUtils.cleanEmbeddedCassandra();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("direct:input")
+                        .to("cql://localhost/camel_ks?cql=" + 
CQL+"&prepareStatements=false");
+                from("direct:inputNoParameter")
+                        .to("cql://localhost/camel_ks?cql=" + 
NO_PARAMETER_CQL+"&prepareStatements=false");
+            }
+        };
+    }
+
+    @Test
+    public void testRequestUriCql() throws Exception {
+        Object response = 
producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang"));
+
+        Cluster cluster = CassandraUnitUtils.cassandraCluster();
+        Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        ResultSet resultSet = session.execute("select login, first_name, 
last_name from camel_user where login = ?", "w_jiang");
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Willem", row.getString("first_name"));
+        assertEquals("Jiang", row.getString("last_name"));
+        session.close();
+        cluster.close();
+    }
+
+    @Test
+    public void testRequestNoParameter_Null() throws Exception {
+        Object response = noParameterProducerTemplate.requestBody(null);
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+        List<Row> rows = (List<Row>) response;
+    }
+
+    @Test
+    public void testRequestNoParameter_Empty() throws Exception {
+        Object response = noParameterProducerTemplate.requestBody(null);
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+        List<Row> rows = (List<Row>) response;
+    }
+
+    @Test
+    public void testRequestMessageCql() throws Exception {
+        Object response = producerTemplate.requestBodyAndHeader(new 
Object[]{"Claus 2", "Ibsen 2", "c_ibsen"},
+                CassandraConstants.CQL_QUERY, "update camel_user set 
first_name=?, last_name=? where login=?");
+
+        Cluster cluster = CassandraUnitUtils.cassandraCluster();
+        Session session = cluster.connect(CassandraUnitUtils.KEYSPACE);
+        ResultSet resultSet = session.execute("select login, first_name, 
last_name from camel_user where login = ?", "c_ibsen");
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+        session.close();
+        cluster.close();
+    }
+
+}

Reply via email to