Add test dbg output for testMultiTopologyPollWithError Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/45df0068 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/45df0068 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/45df0068
Branch: refs/heads/develop Commit: 45df0068b5c222e10fa0e1d4d295759ac08d99ed Parents: 2428f6f Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Thu Oct 26 16:04:51 2017 -0400 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Thu Oct 26 16:04:51 2017 -0400 ---------------------------------------------------------------------- .../edgent/test/topology/TStreamTest.java | 56 +++++++++++++++++--- .../topology/spi/tester/AbstractTester.java | 6 +++ 2 files changed, 55 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/45df0068/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java ---------------------------------------------------------------------- diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java index 441d43f..464f8e2 100644 --- a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java +++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java @@ -745,14 +745,26 @@ public abstract class TStreamTest extends TopologyAbstractTest { Topology t = newTopology(); TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h"); // Throw on the 8th tuple - s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");}); + s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("MTWE Expected Test Exception");}); // Expect 7 tuples out of 8 Condition<Long> tc = t.getTester().tupleCount(s, 7); - complete(t, tc); +// complete(t, tc); + try { + complete(t, tc); + } catch (Exception e) { + System.err.println("MTWE complete() threw e:"+e); + throw e; + } return true; }); } - waitForCompletion(completer, executions); +// waitForCompletion(completer, executions); + try { + waitForCompletion(completer, executions); + } catch (Exception e) { + System.err.println("MTWE waitForCompletion() threw e:"+e); + throw e; + } } /** @@ -762,6 +774,24 @@ public abstract class TStreamTest extends TopologyAbstractTest { @Test public void testMultiTopologyPollWithError() throws Exception { + /* + * It's unclear exactly what this test is supposed to achieve + * (hence unclear how to ensure its achieving it). + * Is it just trying to verify that a failure in one topology/job + * doesn't affect the execution of another? + * + * The way the test is written I'm not sure there's any guarantee + * that the "Expected Exception" will be generated the appropriate + * number of times. + * Is it possible the completion condition could get evaluated + * true (having seen the 7th tuple) before the 8th tuple is generated + * and processed by the sink fn raising the exception, resulting in + * the job being closed... before the 8th is generated and processed? + * I'm also seeing more "Expected Test Exception" traces than I expected. + * + * Annotate this and the MPWE test a bit to help understand what we're seeing + * in the output. + */ int executions = 4; ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>( Executors.newFixedThreadPool(executions)); @@ -771,15 +801,27 @@ public abstract class TStreamTest extends TopologyAbstractTest { AtomicLong n = new AtomicLong(0); TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); // Throw on the 8th tuple - s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");}); + s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("MTPWE Expected Test Exception");}); // Expect 7 tuples out of 8 Condition<Long> tc = t.getTester().tupleCount(s, 7); - complete(t, tc); +// complete(t, tc); + try { + complete(t, tc); + } catch (Exception e) { + System.err.println("MTPWE complete() threw e:"+e); + throw e; + } return true; }); } - waitForCompletion(completer, executions); - } +// waitForCompletion(completer, executions); + try { + waitForCompletion(completer, executions); + } catch (Exception e) { + System.err.println("MTPWE waitForCompletion() threw e:"+e); + throw e; + } + } @Test public void testJoinWithWindow() throws Exception{ http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/45df0068/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java ---------------------------------------------------------------------- diff --git a/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java b/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java index 0c10a08..3dee72a 100644 --- a/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java +++ b/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java @@ -73,8 +73,14 @@ public abstract class AbstractTester implements Tester { Thread.sleep(100); } + if (!endCondition.valid() && getJob().getCurrentState() != State.CLOSED) { + System.err.println("complete(): timed out after " + tmoMsec + "msec"); + } + if (getJob().getCurrentState() != State.CLOSED) getJob().stateChange(Job.Action.CLOSE); + else + System.out.println("complete(): Job already closed"); return endCondition.valid(); }