This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 40ac033448e4a051545670fd53a2fc10b715d23f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 12:27:06 2021 +0100 Convert the HDFS tests to the new reusable sink test base class --- .../hdfs/sink/CamelSinkHDFSITCase.java | 95 ++++++++++++---------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java index 00234b5..c7e7cc3 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java @@ -19,10 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy; import org.apache.camel.test.infra.hdfs.v2.services.HDFSService; @@ -38,13 +40,12 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkHDFSITCase extends AbstractKafkaTest { +public class CamelSinkHDFSITCase extends CamelSinkTestSupport { @RegisterExtension public static HDFSService hdfsService = HDFSServiceFactory.createService(); @@ -52,6 +53,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { private HDFSEasy hdfsEasy; private Path currentBasePath; + private String topicName; private final int expect = 10; @@ -60,9 +62,9 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { return new String[] {"camel-hdfs-kafka-connector"}; } - @BeforeEach public void setUp() throws IOException, URISyntaxException { + topicName = getTopicForTest(this); hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort()); String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/"; @@ -81,54 +83,51 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { } } - private boolean filesCreated() { - return hdfsEasy.filesCreated(currentBasePath, expect); + @Override + protected String testMessageContent(int current) { + return "Sink test message: " + current; } + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } - private String sendKafkaMessages(String baseMessage, int count) throws java.util.concurrent.ExecutionException, InterruptedException { - LOG.info("Sending data to Kafka"); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < count; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage + i); + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + TestUtils.waitFor(this::filesCreated); + } finally { + latch.countDown(); } - return baseMessage; } - @Test - @Timeout(90) - public void testBasicSendReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory - .basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withHostname(hdfsService.getHDFSHost()) - .withPort(hdfsService.getPort()) - .withPath(currentBasePath.getName()) - .withSplitStrategy("MESSAGES:1,IDLE:1000"); + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + boolean filesCreated = filesCreated(); + assertTrue(filesCreated, "The files were not created on the remote host"); - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + try { + assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match"); - final String baseMessage = "Sink test message: "; - sendKafkaMessages(baseMessage, expect); + final String baseMessage = "Sink test message: "; + hdfsEasy.listFiles(currentBasePath) + .stream() + .filter(f -> !f.getPath().getName().contains(".opened")) + .forEach(f -> printFile(f, baseMessage)); + } catch (IOException e) { + fail(e.getMessage()); + } - boolean filesCreated = TestUtils.waitFor(this::filesCreated); - assertTrue(filesCreated, "The files were not created on the remote host"); - assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match"); - hdfsEasy.listFiles(currentBasePath) - .stream() - .filter(f -> !f.getPath().getName().contains(".opened")) - .forEach(f -> printFile(f, baseMessage)); - - } catch (Exception e) { - LOG.error("HDFS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); + } else { + fail("Failed to receive the messages within the specified time"); } } - + private boolean filesCreated() { + return hdfsEasy.filesCreated(currentBasePath, expect); + } private void printFile(LocatedFileStatus f, String matchString) { try { @@ -142,4 +141,18 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest { fail("I/O error: " + e.getMessage()); } } + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory + .basic() + .withTopics(topicName) + .withHostname(hdfsService.getHDFSHost()) + .withPort(hdfsService.getPort()) + .withPath(currentBasePath.getName()) + .withSplitStrategy("MESSAGES:1,IDLE:1000"); + + runTest(connectorPropertyFactory, topicName, expect); + } }