This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ba38537531ac192a13d05a2c7369049a0e21e2ae
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Tue Feb 9 09:54:30 2021 +0100

    Converted the Timer source test case to use the reusable source base class
---
 .../timer/source/CamelSourceTimerITCase.java       | 53 ++++++++--------------
 1 file changed, 18 insertions(+), 35 deletions(-)

diff --git 
a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
 
b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
index bc02984..cedb12d 100644
--- 
a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
+++ 
b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
@@ -19,17 +19,12 @@ package org.apache.camel.kafkaconnector.timer.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -38,11 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceTimerITCase extends AbstractKafkaTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceTimerITCase.class);
-
-    private int received;
+public class CamelSourceTimerITCase extends CamelSourceTestSupport {
     private final int expect = 10;
+    private String topicName;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -51,53 +44,43 @@ public class CamelSourceTimerITCase extends 
AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @Override
+    protected void produceTestData() {
+        // NO-OP
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-        assertEquals(received, expect);
+        assertEquals(expect, received, "Did not receive as many messages as 
expected");
     }
 
     @Test
-    @Timeout(90)
+    @Timeout(30)
     public void testLaunchConnector() throws ExecutionException, 
InterruptedException {
         CamelTimerPropertyFactory connectorPropertyFactory = 
CamelTimerPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withTimerName("launchTest")
                 .withRepeatCount(expect);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
-    @Timeout(90)
+    @Timeout(30)
     public void testLaunchConnectorUsingUrl() throws ExecutionException, 
InterruptedException {
         CamelTimerPropertyFactory connectorPropertyFactory = 
CamelTimerPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl("launchTestUsingUrl")
                     .append("repeatCount", expect)
                     .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }

Reply via email to