This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 7ab14752e618a5fb4b294debcf83a56b379b6e8f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 12:15:23 2021 +0100 Convert the File tests to the new reusable sink test base class --- .../file/sink/CamelSinkFileITCase.java | 122 ++++++++++----------- 1 file changed, 60 insertions(+), 62 deletions(-) diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java index 2dbf459..ead6c58 100644 --- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java +++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java @@ -27,13 +27,12 @@ import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,18 +42,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Testcontainers; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @Testcontainers -public class CamelSinkFileITCase extends AbstractKafkaTest { +public class CamelSinkFileITCase extends CamelSinkTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkFileITCase.class); private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath(); private static final String FILENAME = "test.txt"; + private String topicName; private final int expect = 1; @Override @@ -64,6 +64,7 @@ public class CamelSinkFileITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); cleanup(); } @@ -79,74 +80,46 @@ public class CamelSinkFileITCase extends AbstractKafkaTest { } } - private void putRecords() { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test"); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; - } - } + @Override + protected String testMessageContent(int current) { + return "test"; } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - putRecords(); - - LOG.debug("Created the consumer ... About to receive messages"); - - File sinkFile = new File(SINK_DIR, FILENAME); - File doneFile = new File(SINK_DIR, FILENAME + ".done"); - - waitForFile(sinkFile, doneFile); - - assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath())); - - checkFileContents(sinkFile); - + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; } - @Test - @Timeout(90) - public void testBasicSendReceive() { + @Override + protected void consumeMessages(CountDownLatch latch) { try { - ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withDirectoryName(SINK_DIR) - .withFileName(FILENAME) - .withDoneFileName(FILENAME + ".done"); + File sinkFile = new File(SINK_DIR, FILENAME); + File doneFile = new File(SINK_DIR, FILENAME + ".done"); - runTest(connectorPropertyFactory); - - } catch (Exception e) { - LOG.error("HTTP test failed: {}", e.getMessage(), e); + waitForFile(sinkFile, doneFile); + } catch (InterruptedException e) { + fail(e.getMessage()); + } catch (IOException e) { fail(e.getMessage()); + } finally { + latch.countDown(); } } - @Test - @Timeout(90) - public void testBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl(SINK_DIR) - .append("fileName", FILENAME) - .append("doneFileName", FILENAME + ".done") - .buildUrl(); - + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + File sinkFile = new File(SINK_DIR, FILENAME); - runTest(connectorPropertyFactory); + assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath())); - } catch (Exception e) { - LOG.error("HTTP test failed: {}", e.getMessage(), e); - fail(e.getMessage()); + try { + checkFileContents(sinkFile); + } catch (IOException e) { + fail(e.getMessage()); + } + } else { + fail("Failed to receive the messages within the specified time"); } } @@ -212,4 +185,29 @@ public class CamelSinkFileITCase extends AbstractKafkaTest { retries--; } while (!doneFile.exists() && retries > 0); } + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic() + .withTopics(topicName) + .withDirectoryName(SINK_DIR) + .withFileName(FILENAME) + .withDoneFileName(FILENAME + ".done"); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(90) + public void testBasicSendReceiveUsingUrl() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic() + .withTopics(topicName) + .withUrl(SINK_DIR) + .append("fileName", FILENAME) + .append("doneFileName", FILENAME + ".done") + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + } }