This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit fe801e03a8760237844299af1e869a89a64e85d4 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 19:53:45 2021 +0100 Converted the Cassandra source test case to use the reusable source base class --- .../source/CamelSourceCassandraITCase.java | 60 +++++++++------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java index 04c9402..6508546 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java @@ -19,18 +19,16 @@ package org.apache.camel.kafkaconnector.cassandra.source; import java.util.concurrent.ExecutionException; -import com.datastax.oss.driver.api.core.cql.Row; import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestResultSetConversionStrategy; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.test.infra.cassandra.services.CassandraService; import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -43,7 +41,7 @@ import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFacto import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceCassandraITCase extends AbstractKafkaTest { +public class CamelSourceCassandraITCase extends CamelSourceTestSupport { @RegisterExtension public static CassandraService cassandraService = CassandraServiceFactory.createService(); @@ -51,18 +49,17 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest { private CassandraClient cassandraClient; private TestDataDao testDataDao; + private String topicName; private final int expect = 1; - private int received; @Override protected String[] getConnectorsInTest() { return new String[] {"camel-cql-kafka-connector"}; } - @BeforeEach - public void setUp() { - received = 0; + @BeforeAll + public void setUpTestData() { cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port()); testDataDao = cassandraClient.newTestDataDao(); @@ -76,7 +73,12 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest { } } - @AfterEach + @BeforeEach + public void setUpTest() { + topicName = getTopicForTest(this); + } + + @AfterAll public void tearDown() { if (testDataDao != null) { try { @@ -87,59 +89,45 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest { } } - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - - LOG.debug("Received: {}", record.value()); - received++; - - return false; + @Override + protected void produceTestData() { + // NO-OP (done at the testSetup) } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, Row> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } - @Timeout(90) @Test public void testRetrieveFromCassandra() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory .basic() - .withKafkaTopic(topic) + .withKafkaTopic(topicName) .withHosts(cassandraService.getCassandraHost()) .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) .withResultSetConversionStrategy("ONE") .withCql(testDataDao.getSelectStatement()); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Timeout(90) @Test public void testRetrieveFromCassandraWithCustomStrategy() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory .basic() - .withKafkaTopic(topic) + .withKafkaTopic(topicName) .withHosts(cassandraService.getCassandraHost()) .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) .withResultSetConversionStrategy(classRef(TestResultSetConversionStrategy.class.getName())) .withCql(testDataDao.getSelectStatement()); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } }