change text file writer tests to complete on condition instead of TMO now generally completes quicker and isn't artifically slowed buy the edgent.build.ci TMO multiplier change
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/e231c41a Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/e231c41a Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/e231c41a Branch: refs/heads/develop Commit: e231c41ac38ede13d9cdc20473f09e8ea402e323 Parents: d402541 Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Wed Oct 25 14:35:12 2017 -0400 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Wed Oct 25 14:35:12 2017 -0400 ---------------------------------------------------------------------- .../file/FileStreamsTextFileWriterTest.java | 140 +++++++++++++------ 1 file changed, 98 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/e231c41a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java ---------------------------------------------------------------------- diff --git a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java index 4c0b9cb..fe3b54a 100644 --- a/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java +++ b/connectors/file/src/test/java/org/apache/edgent/test/connectors/file/FileStreamsTextFileWriterTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; import java.io.BufferedInputStream; import java.io.BufferedReader; @@ -318,6 +317,8 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { // default writer policy TSink<String> sink = FileStreams.textFileWriter(s, () -> basePath.toString()); + // note, with only 4 tuples, default policy won't cycle (finalize the cur file) + // to make the expResults present until job stops (TMO) completeAndValidateWriter(t, TMO_SEC, basePath, expResults); assertNotNull(sink); @@ -447,7 +448,10 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { // The mechanisms of this test no longer work with the // recent changes to bump the tmo 2x when edgent.build.ci=true. // So disable while we continue to work on this. - assumeTrue(!Boolean.getBoolean("edgent.build.ci")); + // + // With the general changes for completion condition checking, + // I think this has the chance of working again. +// assumeTrue(!Boolean.getBoolean("edgent.build.ci")); Topology t = newTopology("testRetainAgeBased"); @@ -458,9 +462,8 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { // build expected results int keepCnt = 2; // only keep the last n files with throttling, age, - // and TMO_SEC int ageSec = 5; - long periodMsec = TimeUnit.SECONDS.toMillis(1); + long periodMsec = TimeUnit.SECONDS.toMillis(1); // age retention checking period // net one tuple per file List<List<String>> expResults = buildExpResults(lines, tuple -> true); for (int i = 0; i < keepCnt; i++) @@ -475,9 +478,11 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { // With 4 tuples, throttleDelay=2sec, and ageSec=5 // t0=add-f1, t1, t2=add-f2, t3, t4=add-f3, t5-rm-f1, t6=add-f4, t7=rm-f2, t8, t9=rm-f3, ... // - // So we want to check somewhere around t8 (after t7 and definitely before t9) - // so all 4 files were created and the first 2 have been aged out. - // with complete delay = #files-1*throttle + TMO_SEC, should be 6+2 == t8. + // The expected results happen somewhere around t8 (after t7 and definitely before t9), + // all 4 files were created and the first 2 have been aged out. + // + // If the "completion condition" doesn't manage to get run during that interval + // the test will fail even if the code is working fine. int throttleSec = 2; TStream<String> s = PlumbingStreams.blockingThrottle( @@ -511,7 +516,7 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( FileWriterFlushConfig.newImplicitConfig(), - FileWriterCycleConfig.newCountBasedConfig(1000), + FileWriterCycleConfig.newCountBasedConfig(expResults.get(0).size()), FileWriterRetentionConfig.newFileCountBasedConfig(10) ); FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); @@ -536,7 +541,7 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( FileWriterFlushConfig.newCountBasedConfig(1), // every tuple - FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file + FileWriterCycleConfig.newCountBasedConfig(expResults.get(0).size()), // all in 1 file FileWriterRetentionConfig.newFileCountBasedConfig(10) ); FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); @@ -565,7 +570,7 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)), - FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file + FileWriterCycleConfig.newCountBasedConfig(expResults.get(0).size()), // all in 1 file FileWriterRetentionConfig.newFileCountBasedConfig(10) ); FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); @@ -592,7 +597,7 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( FileWriterFlushConfig.newPredicateBasedConfig( tuple -> tuple.startsWith("1-") || tuple.startsWith("3-")), - FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file + FileWriterCycleConfig.newCountBasedConfig(expResults.get(0).size()), // all in 1 file FileWriterRetentionConfig.newFileCountBasedConfig(10) ); FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); @@ -698,7 +703,9 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { // build expected results // a tuple based config / predicate. in this case should end up with 3 files. - Predicate<String> cycleIt = tuple -> tuple.startsWith("1-") || tuple.startsWith("3-"); + // flush on the last tuple too to ensure the test completes before TMO. + Predicate<String> cycleIt = tuple -> tuple.startsWith("1-") || tuple.startsWith("3-") + || tuple.equals(lines[lines.length-1]); List<List<String>> expResults = buildExpResults(lines, cycleIt); assertEquals(3, expResults.size()); @@ -869,34 +876,59 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { Path basePath, List<List<T>> expResults) throws Exception { try { - // just let it run to the tmo before we check the file contents + // wait until the right number of files and content or we timeout. + // (don't use a wait-till-tmo scheme as that's "too slow" especially + // when complete() adds a TMO multiplier when edgent.build.ci=true) Condition<Object> tc = new Condition<Object>() { - public boolean valid() { return false; } - public Object getResult() { return null; } + public boolean valid() { + try { + return checkFiles(basePath, expResults, true); + } catch (Exception e) { + return false; + } + } + public Object getResult() { return getActFiles(basePath).size(); } }; - - complete(t, tc, tmoSec, TimeUnit.SECONDS); - System.out.println("########## "+t.getName()); + // if we time out we probably need to know which files are present to diagnose + try { + complete(t, tc, tmoSec, TimeUnit.SECONDS); + } catch(Exception e) { + System.out.println("completed with exception: "+e); + } - // right number of files? - List<Path> actFiles = getActFiles(basePath); - System.out.println("actFiles: "+actFiles); - assertEquals(actFiles.toString(), expResults.size(), actFiles.size()); + System.out.println("########## "+t.getName()); - // do the file(s) have the right contents? - System.out.println("expResults: "+expResults); - int i = 0; - for (List<T> expFile : expResults) { - Path path = actFiles.get(i++); - checkContents(path, expFile.toArray(new String[0])); - } + checkFiles(basePath, expResults, false); } finally { deleteAll(basePath); } } + // silent==true => return false on fail; silent==false => asserts/throws on fail + private <T> boolean checkFiles(Path basePath, List<List<T>> expResults, boolean silent) { + + // right number of files? + List<Path> actFiles = getActFiles(basePath); + if (!silent) System.out.println("actFiles: "+actFiles); + if (!silent) + assertEquals(actFiles.toString(), expResults.size(), actFiles.size()); + else if (expResults.size() != actFiles.size()) + return false; + + // do the file(s) have the right contents? + if (!silent) System.out.println("expResults: "+expResults); + int i = 0; + for (List<T> expFile : expResults) { + Path path = actFiles.get(i++); + if (!checkContents(path, expFile.toArray(new String[0]), silent)) + return false; + } + + return true; + } + private void deleteAll(Path basePath) { Path parent = basePath.getParent(); String baseLeaf = basePath.getFileName().toString(); @@ -920,28 +952,39 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { return paths; } - private void checkContents(Path path, String[] lines) { + // silent==true => return false on fail; silent==false => asserts/throws on fail + private boolean checkContents(Path path, String[] lines, boolean silent) { if (path.getFileName().toString().endsWith(".zip")) { - checkZipContents(path, lines); - return; + return checkZipContents(path, lines, silent); } - System.out.println("checking file "+path); + if (!silent) System.out.println("checking file "+path); int lineCnt = 0; try (BufferedReader br = newBufferedReader(path)) { for (String line : lines) { ++lineCnt; String actLine = br.readLine(); - assertEquals("path:"+path+" line "+lineCnt, line, actLine); + if (!silent) + assertEquals("path:"+path+" line "+lineCnt, line, actLine); + else if (!line.equals(actLine)) + return false; } - assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); + if (!silent) + assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); + else if (null != br.readLine()) + return false; } catch (IOException e) { - assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); + if (!silent) + assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); + else + return false; } + return true; } - private void checkZipContents(Path path, String[] lines) { - System.out.println("checking file "+path); + // silent==true => return false on fail; silent==false => asserts/throws on fail + private boolean checkZipContents(Path path, String[] lines, boolean silent) { + if (!silent) System.out.println("checking file "+path); String fileName = path.getFileName().toString(); String entryName = fileName.substring(0, fileName.length() - ".zip".length()); int lineCnt = 0; @@ -952,18 +995,31 @@ public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { { ZipEntry entry = zin.getNextEntry(); - assertEquals(entryName, entry.getName()); + if (!silent) + assertEquals(entryName, entry.getName()); + else if (!entryName.equals(entry.getName())) + return false; BufferedReader br = new BufferedReader(new InputStreamReader(zin, StandardCharsets.UTF_8)); for (String line : lines) { ++lineCnt; String actLine = br.readLine(); - assertEquals("path:"+path+" line "+lineCnt, line, actLine); + if (!silent) + assertEquals("path:"+path+" line "+lineCnt, line, actLine); + else if (!line.equals(actLine)) + return false; } - assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); + if (!silent) + assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); + else if (null != br.readLine()) + return false; } catch (IOException e) { - assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); + if (!silent) + assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); + else + return false; } + return true; } }