This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 550d1a2a64771200568ae40be24e5af4aa5448f8 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Feb 9 09:14:35 2021 +0100 Converted the SSH source test case to use the reusable source base class --- .../ssh/sink/CamelSinkSshITCase.java | 3 +- .../ssh/source/CamelSourceSshITCase.java | 47 +++++++--------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java index d0535d4..02f6f21 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.ssh.services.SshService; import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; @@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail; @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", disabledReason = "Hangs when running with the embedded Kafka Connect instance") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkSshITCase extends CamelSinkTestSupport { @RegisterExtension public static SshService sshService = SshServiceFactory.createService(); @@ -69,7 +71,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport { } - @Override protected void consumeMessages(CountDownLatch latch) { latch.countDown(); diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java index 6673c01..488029d 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java @@ -19,36 +19,29 @@ package org.apache.camel.kafkaconnector.ssh.source; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.ssh.services.SshService; import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", disabledReason = "Hangs when running with the embedded Kafka Connect instance") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceSshITCase extends AbstractKafkaTest { +public class CamelSourceSshITCase extends CamelSourceTestSupport { @RegisterExtension public static SshService sshService = SshServiceFactory.createService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class); - private final int expect = 1; - private int received; private String oldUserHome = System.getProperty("user.home"); @Override @@ -56,41 +49,32 @@ public class CamelSourceSshITCase extends AbstractKafkaTest { return new String[] {"camel-ssh-kafka-connector"}; } - @BeforeEach + @BeforeAll public void setupKeyHome() { System.setProperty("user.home", "target/user-home"); } - @AfterEach + @AfterAll public void tearDownKeyHome() { System.setProperty("user.home", oldUserHome); } - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - - LOG.debug("Received: {}", record.value()); - received++; + @Override + protected void produceTestData() { - return false; } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + @Timeout(90) @Test public void testRetrieveFromSsh() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); + String topic = getTopicForTest(this); ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory .basic() @@ -105,7 +89,6 @@ public class CamelSourceSshITCase extends AbstractKafkaTest { .withEntry("type", "org.apache.camel.kafkaconnector.ssh.transformers.SshTransforms") .end(); - - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topic, expect); } }