Repository: camel Updated Branches: refs/heads/master 69934ac79 -> db5e6be5a
CAMEL-2939: Allow to use unprepared statements Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/742eda6b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/742eda6b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/742eda6b Branch: refs/heads/master Commit: 742eda6b021f67a63fd7bdae83d9c3d4fda7faa5 Parents: 0592f0c Author: Gerald Quintana <gerald.quint...@zenika.com> Authored: Wed Jan 14 17:54:08 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 18 10:00:48 2015 +0100 ---------------------------------------------------------------------- .../component/cassandra/CassandraConsumer.java | 21 +++- .../component/cassandra/CassandraEndpoint.java | 13 ++ .../component/cassandra/CassandraProducer.java | 59 ++++++++- .../CassandraComponentConsumerTest.java | 17 +++ ...assandraComponentProducerUnpreparedTest.java | 119 +++++++++++++++++++ 5 files changed, 220 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index 168a7c2..fdad0ae 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -47,7 +47,12 @@ public class CassandraConsumer extends ScheduledPollConsumer { protected int poll() throws Exception { // Execute CQL Query Session session = getEndpoint().getSessionHolder().getSession(); - ResultSet resultSet = session.execute(preparedStatement.bind()); + ResultSet resultSet; + if (isPrepareStatements()) { + resultSet = session.execute(preparedStatement.bind()); + } else { + resultSet = session.execute(getEndpoint().getCql()); + } // Create message from ResultSet Exchange exchange = getEndpoint().createExchange(); @@ -69,9 +74,19 @@ public class CassandraConsumer extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - - if (preparedStatement == null) { + if (isPrepareStatements()) { preparedStatement = getEndpoint().prepareStatement(); } } + + @Override + protected void doStop() throws Exception { + this.preparedStatement = null; + super.doStop(); + } + + public boolean isPrepareStatements() { + return getEndpoint().isPrepareStatements(); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java index 5d6ad1f..8d70b9f 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java @@ -51,6 +51,11 @@ public class CassandraEndpoint extends DefaultEndpoint { private String keyspace; @UriParam private String cql; + /** + * Use PreparedStatements or normal Statements + */ + @UriParam + private boolean prepareStatements=true; @UriParam private String clusterName; @UriParam @@ -282,4 +287,12 @@ public class CassandraEndpoint extends DefaultEndpoint { public void setResultSetConversionStrategy(ResultSetConversionStrategy resultSetConversionStrategy) { this.resultSetConversionStrategy = resultSetConversionStrategy; } + + public boolean isPrepareStatements() { + return prepareStatements; + } + + public void setPrepareStatements(boolean prepareStatements) { + this.prepareStatements = prepareStatements; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java index 824d0eb..4d1d39b 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java @@ -46,10 +46,26 @@ public class CassandraProducer extends DefaultProducer { } @Override + protected void doStart() throws Exception { + super.doStart(); + if (isPrepareStatements()) { + this.preparedStatement = getEndpoint().prepareStatement(); + } + } + + @Override + protected void doStop() throws Exception { + this.preparedStatement = null; + super.doStop(); + } + + @Override public CassandraEndpoint getEndpoint() { return (CassandraEndpoint) super.getEndpoint(); } - + public boolean isPrepareStatements() { + return getEndpoint().isPrepareStatements(); + } private Object[] getCqlParams(Message message) { Object cqlParamsObj = message.getBody(); Object[] cqlParams; @@ -75,19 +91,29 @@ public class CassandraProducer extends DefaultProducer { Object[] cqlParams = getCqlParams(message); ResultSet resultSet; + Session session = getEndpoint().getSessionHolder().getSession(); + if (isPrepareStatements()) { + resultSet = executePreparedStatement(session, messageCql, cqlParams); + } else { + resultSet = executeStatement(session, messageCql, cqlParams); + } + return resultSet; + } + + /** + * Execute CQL as PreparedStatement + */ + private ResultSet executePreparedStatement(Session session, String messageCql, Object[] cqlParams) { + ResultSet resultSet; PreparedStatement lPreparedStatement; if (messageCql == null || messageCql.isEmpty()) { // URI CQL - if (preparedStatement == null) { - this.preparedStatement = getEndpoint().prepareStatement(); - } lPreparedStatement = this.preparedStatement; } else { // Message CQL lPreparedStatement = getEndpoint().prepareStatement(messageCql); } - Session session = getEndpoint().getSessionHolder().getSession(); - if (cqlParams == null || cqlParams.length==0) { + if (cqlParams == null || cqlParams.length == 0) { resultSet = session.execute(lPreparedStatement.bind()); } else { resultSet = session.execute(lPreparedStatement.bind(cqlParams)); @@ -95,6 +121,27 @@ public class CassandraProducer extends DefaultProducer { return resultSet; } + /** + * Execute CQL as is + */ + private ResultSet executeStatement(Session session, String messageCql, Object[] cqlParams) { + ResultSet resultSet; + String cql; + if (messageCql == null || messageCql.isEmpty()) { + // URI CQL + cql = getEndpoint().getCql(); + } else { + // Message CQL + cql = messageCql; + } + if (cqlParams == null || cqlParams.length == 0) { + resultSet = session.execute(cql); + } else { + resultSet = session.execute(cql, cqlParams); + } + return resultSet; + } + public void process(Exchange exchange) throws Exception { ResultSet resultSet = execute(exchange.getIn()); getEndpoint().fillMessage(resultSet, exchange.getOut()); http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java index 76d303d..788844e 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java @@ -66,6 +66,21 @@ public class CassandraComponentConsumerTest extends CamelTestSupport { } @Test + public void testConsumeUnprepared() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultUnprepared"); + mock.expectedMinimumMessageCount(1); + mock.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof List); + } + }); + mock.await(1, TimeUnit.SECONDS); + assertMockEndpointsSatisfied(); + } + + @Test public void testConsumeOne() throws Exception { MockEndpoint mock = getMockEndpoint("mock:resultOne"); mock.expectedMinimumMessageCount(1); @@ -87,6 +102,8 @@ public class CassandraComponentConsumerTest extends CamelTestSupport { public void configure() { from("cql://localhost/camel_ks?cql=" + CQL) .to("mock:resultAll"); + from("cql://localhost/camel_ks?cql=" + CQL + "&prepareStatements=false") + .to("mock:resultUnprepared"); from("cql://localhost/camel_ks?cql=" + CQL + "&resultSetConversionStrategy=ONE") .to("mock:resultOne"); } http://git-wip-us.apache.org/repos/asf/camel/blob/742eda6b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java new file mode 100644 index 0000000..f8702d1 --- /dev/null +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentProducerUnpreparedTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra; + +import com.datastax.driver.core.*; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.cassandraunit.CassandraCQLUnit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class CassandraComponentProducerUnpreparedTest extends CamelTestSupport { + + private static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)"; + private static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user"; + + @Rule + public CassandraCQLUnit cassandra = CassandraUnitUtils.cassandraCQLUnit(); + + @Produce(uri = "direct:input") + ProducerTemplate producerTemplate; + + @Produce(uri = "direct:inputNoParameter") + ProducerTemplate noParameterProducerTemplate; + + @BeforeClass + public static void setUpClass() throws Exception { + CassandraUnitUtils.startEmbeddedCassandra(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + CassandraUnitUtils.cleanEmbeddedCassandra(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + from("direct:input") + .to("cql://localhost/camel_ks?cql=" + CQL+"&prepareStatements=false"); + from("direct:inputNoParameter") + .to("cql://localhost/camel_ks?cql=" + NO_PARAMETER_CQL+"&prepareStatements=false"); + } + }; + } + + @Test + public void testRequestUriCql() throws Exception { + Object response = producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang")); + + Cluster cluster = CassandraUnitUtils.cassandraCluster(); + Session session = cluster.connect(CassandraUnitUtils.KEYSPACE); + ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "w_jiang"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Willem", row.getString("first_name")); + assertEquals("Jiang", row.getString("last_name")); + session.close(); + cluster.close(); + } + + @Test + public void testRequestNoParameter_Null() throws Exception { + Object response = noParameterProducerTemplate.requestBody(null); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + List<Row> rows = (List<Row>) response; + } + + @Test + public void testRequestNoParameter_Empty() throws Exception { + Object response = noParameterProducerTemplate.requestBody(null); + + assertNotNull(response); + assertIsInstanceOf(List.class, response); + List<Row> rows = (List<Row>) response; + } + + @Test + public void testRequestMessageCql() throws Exception { + Object response = producerTemplate.requestBodyAndHeader(new Object[]{"Claus 2", "Ibsen 2", "c_ibsen"}, + CassandraConstants.CQL_QUERY, "update camel_user set first_name=?, last_name=? where login=?"); + + Cluster cluster = CassandraUnitUtils.cassandraCluster(); + Session session = cluster.connect(CassandraUnitUtils.KEYSPACE); + ResultSet resultSet = session.execute("select login, first_name, last_name from camel_user where login = ?", "c_ibsen"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Claus 2", row.getString("first_name")); + assertEquals("Ibsen 2", row.getString("last_name")); + session.close(); + cluster.close(); + } + +}