This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 61ba7a7 Prevent the file test from failing when the test file has been created but no content has been written new 6456a82 Merge pull request #6 from orpiske/fix-camel-file-test 61ba7a7 is described below commit 61ba7a736f94579fb161f0659a47c022ab51cb45 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Dec 5 15:09:12 2019 +0100 Prevent the file test from failing when the test file has been created but no content has been written --- .../sink/file/CamelSinkFileITCase.java | 51 ++++++++++++++-------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java index a6fb99b..f817dc4 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java @@ -53,6 +53,7 @@ public class CamelSinkFileITCase { private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath(); private static final String FILENAME = "test.txt"; + @Rule public KafkaContainer kafka = new KafkaContainer().withEmbeddedZookeeper(); @@ -66,9 +67,14 @@ public class CamelSinkFileITCase { ContainerUtil.waitForInitialization(kafka); LOG.info("Kafka bootstrap server running at address {}", kafka.getBootstrapServers()); - String url = "file://" + SINK_DIR + "?fileName=" + FILENAME; + String url = "file://" + SINK_DIR + "?fileName=" + FILENAME + "&doneFileName=${file:name}.done"; LOG.debug("Saving files to {}", url); + File doneFile = new File(SINK_DIR, FILENAME + ".done"); + if (doneFile.exists()) { + doneFile.delete(); + } + ConnectorPropertyFactory testProperties = new CamelFilePropertyFactory(1, TestCommon.DEFAULT_TEST_TOPIC, url); @@ -103,23 +109,14 @@ public class CamelSinkFileITCase { LOG.debug("Created the consumer ... About to receive messages"); File sinkFile = new File(SINK_DIR, FILENAME); - waitForFile(sinkFile); + File doneFile = new File(SINK_DIR, FILENAME + ".done"); - Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists()); + waitForFile(sinkFile, doneFile); - BufferedReader reader = new BufferedReader(new FileReader(sinkFile)); + Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists()); - int i = 0; - String line; - do { - line = reader.readLine(); - if (line != null) { - Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line); - i++; - } - } while (line != null); + checkFileContents(sinkFile); - Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i); } catch (Exception e) { LOG.error("HTTP test failed: {}", e.getMessage(), e); fail(e.getMessage()); @@ -128,10 +125,30 @@ public class CamelSinkFileITCase { } } - private void waitForFile(File sinkFile) throws IOException, InterruptedException { + private void checkFileContents(File sinkFile) throws IOException { + BufferedReader reader = new BufferedReader(new FileReader(sinkFile)); + + int i = 0; + String line; + do { + line = reader.readLine(); + if (line != null) { + Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line); + i++; + } + } while (line != null); + + Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i); + } + + private void waitForFile(File sinkFile, File doneFile) throws IOException, InterruptedException { WatchService watchService = FileSystems.getDefault().newWatchService(); Path path = sinkFile.getParentFile().toPath(); + if (doneFile.exists()) { + return; + } + // We watch for both the file creation and truncation path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY); @@ -156,7 +173,7 @@ public class CamelSinkFileITCase { Path contextPath = (Path) context; - if (contextPath.toString().equals(sinkFile.getName())) { + if (contextPath.toString().equals(doneFile.getName())) { LOG.info("Sink file at the build path {} had a matching event of type: {}", sinkFile.getPath(), event.kind()); @@ -168,6 +185,6 @@ public class CamelSinkFileITCase { } watchKey.reset(); retries--; - } while (!sinkFile.exists() && retries > 0); + } while (!doneFile.exists() && retries > 0); } }