improve PressureReliever tests - add "simple" test that verifies relief/dropping - try to make the "continuous" test better and usable on CI
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/39764b53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/39764b53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/39764b53 Branch: refs/heads/develop Commit: 39764b534663d5f5ae6f15164cf7dcf16481103f Parents: 177e912 Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Fri Oct 27 14:31:30 2017 -0400 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Fri Oct 27 14:31:30 2017 -0400 ---------------------------------------------------------------------- .../edgent/test/topology/PlumbingTest.java | 144 +++++++++++++------ 1 file changed, 100 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/39764b53/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java ---------------------------------------------------------------------- diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java index 6fe2e2a..644ac4f 100644 --- a/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java +++ b/api/topology/src/test/java/org/apache/edgent/test/topology/PlumbingTest.java @@ -161,67 +161,49 @@ public abstract class PlumbingTest extends TopologyAbstractTest { } @Test - public void testPressureReliever() throws Exception { - // Timing variances on shared machines can cause this test to fail - assumeTrue(!Boolean.getBoolean("edgent.build.ci")); + public void testPressureRelieverDrop() throws Exception { Topology topology = newTopology(); - TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS); - + // Verify the pressureReliever drops and retains the most recent when + // backpressure exists. + // + // Here, all the tuples hit the reliever at once, the downstream processing (oneShotDelay) + // causes a backup causing the reliever's queue to become full and drop tuples. + // The first tuple should be processed, then the last (most recent) N (N==queue depth). - TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 5); + String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"}; + String[] expTuples = {"A", "F", "G", "H"}; // with queue depth of 3 + TStream<String> raw = topology.strings(tuples); - // insert a blocking delay acting as downstream operator that cannot keep up - TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS); + TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 3); - // calculate the delay - TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v)); - - // Also process raw that should be unaffected by the slow path - TStream<String> processed = raw.asString(); + TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS); + Condition<Long> tcCount = topology.getTester().tupleCount(pr2, expTuples.length); + Condition<List<String>> contents = topology.getTester().streamContents(pr2, expTuples); + complete(topology, tcCount); - Condition<Long> tcSlowCount = topology.getTester().atLeastTupleCount(slow, 20); - Condition<List<TimeAndId>> tcRaw = topology.getTester().streamContents(raw); - Condition<List<TimeAndId>> tcSlow = topology.getTester().streamContents(slow); - Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM); - Condition<List<String>> tcProcessed = topology.getTester().streamContents(processed); - complete(topology, tcSlowCount); - - assertTrue(tcProcessed.getResult().size() > tcSlowM.getResult().size()); - for (TimeAndId delay : tcSlowM.getResult()) - assertTrue("delay:"+delay, delay.ms < 300); - - // Must not lose any tuples in the non relieving path - Set<TimeAndId> uniq = new HashSet<>(tcRaw.getResult()); - assertEquals(tcRaw.getResult().size(), uniq.size()); - - // Must not lose any tuples in the non relieving path - Set<String> uniqProcessed = new HashSet<>(tcProcessed.getResult()); - assertEquals(tcProcessed.getResult().size(), uniqProcessed.size()); - - assertEquals(uniq.size(), uniqProcessed.size()); - - // Might lose tuples, but must not have send duplicates - uniq = new HashSet<>(tcSlow.getResult()); - assertEquals(tcSlow.getResult().size(), uniq.size()); + assertTrue(tcCount.valid()); + assertTrue(contents.valid()); } @Test - public void testPressureRelieverWithInitialDelay() throws Exception { + public void testPressureRelieverNoDrop() throws Exception { Topology topology = newTopology(); + // Same pipeline config as testPressureRelieverDrop but the reliever queue is + // big enough to avoid drops + String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"}; + TStream<String> raw = topology.strings(tuples); - TStream<String> raw = topology.strings("A", "B", "C", "D", "E", "F", "G", "H"); + TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 100); - TStream<String> pr = PlumbingStreams.pressureReliever(raw, v -> 0, 100); + TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS); - TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 5, TimeUnit.SECONDS); - - Condition<Long> tcCount = topology.getTester().tupleCount(pr2, 8); - Condition<List<String>> contents = topology.getTester().streamContents(pr2, "A", "B", "C", "D", "E", "F", "G", "H"); + Condition<Long> tcCount = topology.getTester().tupleCount(pr2, tuples.length); + Condition<List<String>> contents = topology.getTester().streamContents(pr2, tuples); complete(topology, tcCount); assertTrue(tcCount.valid()); @@ -229,6 +211,80 @@ public abstract class PlumbingTest extends TopologyAbstractTest { } @Test + public void testPressureRelieverContinuous() throws Exception { + // Timing variances on shared machines can cause this test to fail + //assumeTrue(!Boolean.getBoolean("edgent.build.ci")); + + // Try to verify more continuous reliever behavior instead of just the + // the other pressure reliever tests where the backpressure only exists + // at the beginning. + // + // Generate @ 100tps, consume @ 5tps. + // With reliever depth=1, roughly should process every 20th tuple, with essentially + // no delay in the queue (certainly less than say 50% of the consumer delay, hence < 0.5 * 200ms) + + Topology topology = newTopology(); + + TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS); + + TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 1); + + TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS); + + // calculate the delay (queue time + consumer processing) + TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v)); + + // Also process raw that should be unaffected by the slow path + TStream<TimeAndId> processed = raw.filter(t -> true); + + + Condition<Long> tcSlowMCount = topology.getTester().atLeastTupleCount(slowM, 10); + Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM); + Condition<List<TimeAndId>> tcProcessed = topology.getTester().streamContents(processed); + complete(topology, tcSlowMCount); + + System.out.println(String.format("testPressureRelieverContinuous() fastCnt:%d slowCnt:%d", + tcProcessed.getResult().size(), tcSlowM.getResult().size())); + System.out.println("slow: "+tcSlowM.getResult()); + + // No lost tuples in the fast path (successive Ids, starting @ 1) + assertEquals("fastpath tuples dropped", + tcProcessed.getResult().size(), + tcProcessed.getResult().get(tcProcessed.getResult().size()-1).id); + + // No dup tuples in the fast path + Set<TimeAndId> uniqRaw = new HashSet<>(tcProcessed.getResult()); + assertEquals("fastpath tuples duplicated", tcProcessed.getResult().size(), uniqRaw.size()); + + // fastpath count should be roughly 20x the slow delayed/relieved count + assertTrue("rawCnt:"+tcProcessed.getResult().size()+" slowMCnt:"+tcSlowM.getResult().size(), + tcProcessed.getResult().size() >= 15 * +tcSlowM.getResult().size()); + + // slow should process roughly every 20th tuple... not successive ones + TimeAndId prevId = null; + for (TimeAndId id : tcSlowM.getResult()) { + if (prevId == null) { + // should have processed the 1st tuple + assertEquals("slow firstId", 1, id.id); + } + else { + // seems like this could be sensitive to host load + assertTrue("slow ids prevId:"+prevId+" id:"+id, + id.id >= prevId.id + 15 + && id.id <= prevId.id + 25); + } + prevId = id; + } + + // every slow tuple should be processed near instantaneously - shouldn't wait + // long in the queue. + for (TimeAndId id : tcSlowM.getResult()) { + assertTrue("slow delays prevId:"+prevId+" id:"+id, + id.ms <= 300); // 200ms consumer processing + up to %50 of that waiting + } + } + + @Test public void testValveState() throws Exception { Valve<Integer> valve = new Valve<>(); assertTrue(valve.isOpen());