This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f2f51b1309586306b4a8fb5124edee176d9d9f97
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Feb 9 09:09:38 2021 +0100

    Converted the SQL source test case to use the reusable source base class
---
 .../sql/source/CamelSourceSQLITCase.java           | 47 ++++++++--------------
 1 file changed, 17 insertions(+), 30 deletions(-)

diff --git 
a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
 
b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
index 4bf3bd5..6ee744c 100644
--- 
a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
+++ 
b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
@@ -19,20 +19,15 @@ package org.apache.camel.kafkaconnector.sql.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 
@@ -40,14 +35,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = 
"local-(kafka|strimzi)-container",
         disabledReason = "Database connection fails when running with the 
embedded Kafka Connect instance")
-public class CamelSourceSQLITCase extends AbstractKafkaTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceSQLITCase.class);
-
+public class CamelSourceSQLITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public JDBCService sqlService;
 
     private final int expect = 1;
-    private int received;
 
     public CamelSourceSQLITCase() {
         JdbcDatabaseContainer<?> container = new 
PostgreSQLContainer<>("postgres:9.6.2")
@@ -69,34 +61,29 @@ public class CamelSourceSQLITCase extends AbstractKafkaTest 
{
         return new String[] {"camel-sql-kafka-connector"};
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        return false;
+    @Override
+    protected void produceTestData() {
+        // NO-OP, already done via init script in the service initialization
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of 
messages");
     }
 
     @Timeout(30)
     @Test
     public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = 
CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-            .withQuery("select * from 
test").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+        String topicName = getTopicForTest(this);
+
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory
+                .basic()
+                
.withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("select * from test")
+                .withTopics(topicName);
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
 
     }
 }

Reply via email to