This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 054de9d177413d7267f5f774d60b772c8e94b4e0 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Aug 25 09:49:41 2020 +0200 Fix cassandra for camel 3.5 Adjusts the client code to use the newer Cassandra client version from camel-cassandraql --- .../cassandra/clients/CassandraClient.java | 18 ++++----- .../cassandra/clients/dao/TestDataDao.java | 43 ++++++++++++++-------- .../dao/TestResultSetConversionStrategy.java | 4 +- .../cassandra/sink/CamelSinkCassandraITCase.java | 6 ++- .../source/CamelSourceCassandraITCase.java | 8 +++- 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java index 2c26f24..b8b1704 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/CassandraClient.java @@ -17,24 +17,24 @@ package org.apache.camel.kafkaconnector.cassandra.clients; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; +import java.net.InetSocketAddress; + +import com.datastax.oss.driver.api.core.CqlSession; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao; /** * A simple client for Cassandra for testing purposes */ public class CassandraClient { - private Cluster cluster; - private Session session; + private CqlSession session; public CassandraClient(String host, int port) { - cluster = Cluster.builder() - .addContactPoint(host) - .withPort(port) - .build(); + InetSocketAddress socketAddress = new InetSocketAddress(host, port); - session = cluster.connect(); + session = CqlSession.builder() + .addContactPoint(socketAddress) + .withLocalDatacenter("datacenter1") + .build(); } diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java index 4fcde37..ea8de7a 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestDataDao.java @@ -23,11 +23,11 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +38,9 @@ public class TestDataDao { private static final Logger LOG = LoggerFactory.getLogger(TestDataDao.class); - private final Session session; + private final CqlSession session; - public TestDataDao(Session session) { + public TestDataDao(CqlSession session) { this.session = session; } @@ -52,12 +52,16 @@ public class TestDataDao { String statement = SchemaBuilder.createKeyspace(KEY_SPACE) .ifNotExists() - .with() - .replication(replication).getQueryString(); + .withReplicationOptions(replication) + .asCql(); LOG.info("Executing {}", statement); - session.execute(statement); + ResultSet rs = session.execute(statement); + + if (!rs.wasApplied()) { + LOG.warn("The create key space statement did not execute"); + } } public void useKeySpace() { @@ -69,22 +73,29 @@ public class TestDataDao { public void createTable() { String statement = SchemaBuilder.createTable(TABLE_NAME) - .addPartitionKey("id", DataType.timeuuid()) - .addClusteringColumn("text", DataType.text()) - .getQueryString(); + .withPartitionKey("id", DataTypes.TIMEUUID) + .withClusteringColumn("text", DataTypes.TEXT) + .asCql(); + LOG.info("Executing create table {}", statement); - session.execute(statement); + ResultSet rs = session.execute(statement); + if (!rs.wasApplied()) { + LOG.warn("The create table statement did not execute"); + } } public void dropTable() { String statement = SchemaBuilder.dropTable(TABLE_NAME) - .getQueryString(); + .asCql(); LOG.info("Executing drop table {}", statement); - session.execute(statement); + ResultSet rs = session.execute(statement); + if (!rs.wasApplied()) { + LOG.warn("The drop table statement did not execute"); + } } public boolean hasEnoughData(long expected) { diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java index 84f57a2..63febeb 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/clients/dao/TestResultSetConversionStrategy.java @@ -21,8 +21,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; import org.apache.camel.component.cassandra.ResultSetConversionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java index 0cdf1b6..c583d71 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java @@ -77,7 +77,11 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { cassandraClient = cassandraService.getClient(); if (testDataDao != null) { - testDataDao.dropTable(); + try { + testDataDao.dropTable(); + } catch (Exception e) { + LOG.warn("Unable to drop the table: {}", e.getMessage(), e); + } } } diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java index 707f923..25ef1ee 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java @@ -19,7 +19,7 @@ package org.apache.camel.kafkaconnector.cassandra.source; import java.util.concurrent.ExecutionException; -import com.datastax.driver.core.Row; +import com.datastax.oss.driver.api.core.cql.Row; import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestResultSetConversionStrategy; @@ -78,7 +78,11 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest { cassandraClient = cassandraService.getClient(); if (testDataDao != null) { - testDataDao.dropTable(); + try { + testDataDao.dropTable(); + } catch (Exception e) { + LOG.warn("Unable to drop the table: {}", e.getMessage(), e); + } } }