This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fe801e03a8760237844299af1e869a89a64e85d4
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Mon Feb 8 19:53:45 2021 +0100

    Converted the Cassandra source test case to use the reusable source base 
class
---
 .../source/CamelSourceCassandraITCase.java         | 60 +++++++++-------------
 1 file changed, 24 insertions(+), 36 deletions(-)

diff --git 
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
 
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
index 04c9402..6508546 100644
--- 
a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
+++ 
b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
@@ -19,18 +19,16 @@ package org.apache.camel.kafkaconnector.cassandra.source;
 
 import java.util.concurrent.ExecutionException;
 
-import com.datastax.oss.driver.api.core.cql.Row;
 import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
 import 
org.apache.camel.kafkaconnector.cassandra.clients.dao.TestResultSetConversionStrategy;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.test.infra.cassandra.services.CassandraService;
 import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -43,7 +41,7 @@ import static 
org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFacto
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceCassandraITCase extends AbstractKafkaTest {
+public class CamelSourceCassandraITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static CassandraService cassandraService = 
CassandraServiceFactory.createService();
 
@@ -51,18 +49,17 @@ public class CamelSourceCassandraITCase extends 
AbstractKafkaTest {
 
     private CassandraClient cassandraClient;
     private TestDataDao testDataDao;
+    private String topicName;
 
     private final int expect = 1;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-cql-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
+    @BeforeAll
+    public void setUpTestData() {
         cassandraClient = new 
CassandraClient(cassandraService.getCassandraHost(), 
cassandraService.getCQL3Port());
 
         testDataDao = cassandraClient.newTestDataDao();
@@ -76,7 +73,12 @@ public class CamelSourceCassandraITCase extends 
AbstractKafkaTest {
         }
     }
 
-    @AfterEach
+    @BeforeEach
+    public void setUpTest() {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterAll
     public void tearDown() {
         if (testDataDao != null) {
             try {
@@ -87,59 +89,45 @@ public class CamelSourceCassandraITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        return false;
+    @Override
+    protected void produceTestData() {
+        // NO-OP (done at the testSetup)
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, Row> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of 
messages");
     }
 
-
     @Timeout(90)
     @Test
     public void testRetrieveFromCassandra() throws ExecutionException, 
InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
         ConnectorPropertyFactory connectorPropertyFactory = 
CamelCassandraPropertyFactory
                 .basic()
-                .withKafkaTopic(topic)
+                .withKafkaTopic(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 .withResultSetConversionStrategy("ONE")
                 .withCql(testDataDao.getSelectStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Timeout(90)
     @Test
     public void testRetrieveFromCassandraWithCustomStrategy() throws 
ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
         ConnectorPropertyFactory connectorPropertyFactory = 
CamelCassandraPropertyFactory
                 .basic()
-                .withKafkaTopic(topic)
+                .withKafkaTopic(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 
.withResultSetConversionStrategy(classRef(TestResultSetConversionStrategy.class.getName()))
                 .withCql(testDataDao.getSelectStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }

Reply via email to