This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 36f3f9be64e3c4bcece6291742449342ecd69ba5 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 15:51:28 2021 +0100 Convert the SSH tests to the new reusable sink test base class --- .../ssh/sink/CamelSinkSshITCase.java | 67 +++++++++++----------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java index cf7e9dd..1c71719 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -17,18 +17,16 @@ package org.apache.camel.kafkaconnector.ssh.sink; +import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.ssh.services.SshService; import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; @@ -40,46 +38,42 @@ import static org.junit.jupiter.api.Assertions.fail; @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", disabledReason = "Hangs when running with the embedded Kafka Connect instance") -public class CamelSinkSshITCase extends AbstractKafkaTest { +public class CamelSinkSshITCase extends CamelSinkTestSupport { @RegisterExtension public static SshService sshService = SshServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class); private final int expect = 3; + private String topic; @Override protected String[] getConnectorsInTest() { return new String[] {"camel-ssh-kafka-connector"}; } - private void putRecords(CountDownLatch latch) { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - try { - for (int i = 0; i < expect; i++) { - try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date"); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; - } - } - } finally { - latch.countDown(); - } + @BeforeEach + public void setUp() { + topic = TestUtils.getDefaultTestTopic(this.getClass()); } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); + @Override + protected String testMessageContent(int current) { + return "date"; + } - getKafkaConnectService().initializeConnector(connectorPropertyFactory); + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } - CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(() -> putRecords(latch)); + @Override + protected void consumeMessages(CountDownLatch latch) { + latch.countDown(); + } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (!latch.await(30, TimeUnit.SECONDS)) { fail("Timed out wait for data to be added to the Kafka cluster"); } @@ -87,12 +81,15 @@ public class CamelSinkSshITCase extends AbstractKafkaTest { @Timeout(90) @Test - public void testSshCommand() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost()) - .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root"); - - runTest(connectorPropertyFactory); + public void testSshCommand() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory + .basic() + .withTopics(topic) + .withHost(sshService.getSshHost()) + .withPort(Integer.toString(sshService.getSshPort())) + .withUsername("root") + .withPassword("root"); + + runTest(connectorPropertyFactory, topic, expect); } }