This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit ba38537531ac192a13d05a2c7369049a0e21e2ae Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Feb 9 09:54:30 2021 +0100 Converted the Timer source test case to use the reusable source base class --- .../timer/source/CamelSourceTimerITCase.java | 53 ++++++++-------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java index bc02984..cedb12d 100644 --- a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java +++ b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java @@ -19,17 +19,12 @@ package org.apache.camel.kafkaconnector.timer.source; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -38,11 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; * messages */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceTimerITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTimerITCase.class); - - private int received; +public class CamelSourceTimerITCase extends CamelSourceTestSupport { private final int expect = 10; + private String topicName; @Override protected String[] getConnectorsInTest() { @@ -51,53 +44,43 @@ public class CamelSourceTimerITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - received = 0; + topicName = getTopicForTest(this); } - private boolean checkRecord(ConsumerRecord<String, String> record) { - received++; - - if (received == expect) { - return false; - } - - return true; + @Override + protected void produceTestData() { + // NO-OP } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); - assertEquals(received, expect); + assertEquals(expect, received, "Did not receive as many messages as expected"); } @Test - @Timeout(90) + @Timeout(30) public void testLaunchConnector() throws ExecutionException, InterruptedException { CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withTimerName("launchTest") .withRepeatCount(expect); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Test - @Timeout(90) + @Timeout(30) public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException { CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withUrl("launchTestUsingUrl") .append("repeatCount", expect) .buildUrl(); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } }