Repository: incubator-edgent Updated Branches: refs/heads/master a39e92054 -> 14c7989b7
[Edgent-396] JobMonitorApp restarts job 3x more than it should - instrument/validate JobMonitorAppTest - fix JobMonitorApp event filtering Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/14c7989b Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/14c7989b Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/14c7989b Branch: refs/heads/master Commit: 14c7989b74edc6ff179053fe8ae3e188ce993176 Parents: a39e920 Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Tue Mar 14 15:54:39 2017 -0400 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Tue Mar 14 15:54:39 2017 -0400 ---------------------------------------------------------------------- .../edgent/apps/runtime/JobMonitorApp.java | 14 +++++- .../test/apps/runtime/JobMonitorAppTest.java | 47 +++++++++++++++++--- 2 files changed, 54 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/14c7989b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java ---------------------------------------------------------------------- diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java index 883b023..2a90cc1 100644 --- a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java +++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java @@ -216,9 +216,19 @@ public class JobMonitorApp { logger.trace("Filter: {}", value); try { + // Only trigger on the initial unhealthy event: + // state:RUNNING nextState:RUNNING UNHEALTHY + // Closing the UNHEALTHY job then results in additional UNHEALTHY events + // that we need to ignore: + // RUNNING, CLOSED, UNHEALTHY + // CLOSED, CLOSED, UNHEALTHY JsonObject job = JobMonitorAppEvent.getJob(value); return (Job.Health.UNHEALTHY.name().equals( - JobMonitorAppEvent.getJobHealth(job))); + JobMonitorAppEvent.getJobHealth(job)) + && Job.State.RUNNING.name().equals( + JobMonitorAppEvent.getProperty(job, "state")) + && Job.State.RUNNING.name().equals( + JobMonitorAppEvent.getProperty(job, "nextState"))); } catch (IllegalArgumentException e) { logger.info("Invalid event filtered out, cause: {}", e.getMessage()); return false; @@ -246,6 +256,8 @@ public class JobMonitorApp { JsonObject job = JobMonitorAppEvent.getJob(value); String applicationName = JobMonitorAppEvent.getJobName(job); + logger.trace("close and restart: {}", value); + closeJob(applicationName, controlService); submitApplication(applicationName, controlService); } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/14c7989b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java ---------------------------------------------------------------------- diff --git a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java index 51b129c..d109b0a 100644 --- a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java +++ b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java @@ -55,8 +55,12 @@ public class JobMonitorAppTest { Job monitor = app.submit(); // Declare and register user apps which need monitoring - registerMonitoredApplicationOne(provider); - registerMonitoredApplicationTwo(provider); + AtomicInteger appOneBuildCnt = new AtomicInteger(); + AtomicInteger appOneInjectedErrorCnt = new AtomicInteger(); + AtomicInteger appTwoBuildCnt = new AtomicInteger(); + AtomicInteger appTwoInjectedErrorCnt = new AtomicInteger(); + registerMonitoredApplicationOne(provider, appOneBuildCnt, appOneInjectedErrorCnt); + registerMonitoredApplicationTwo(provider, appTwoBuildCnt, appTwoInjectedErrorCnt); // Start monitored apps startMonitoredApplications(provider); @@ -67,6 +71,31 @@ public class JobMonitorAppTest { Job.State.RUNNING.equals(monitor.getCurrentState()) && Job.State.RUNNING.equals(monitor.getNextState()) && Job.Health.HEALTHY.equals(monitor.getHealth())); + + // Verify app restarts. + // Ideally, each app should be started (rebuilt) exactly + // once initially + once for each injected failure. + // Given the timing vagueness of this test allow for + // a little wiggle room - allow one fewer than expected. + + int appOneExpectedBuildCnt = 1 + appOneInjectedErrorCnt.get(); + int appTwoExpectedBuildCnt = 1 + appTwoInjectedErrorCnt.get(); + + int appOneActBuildCnt = appOneBuildCnt.get(); + int appTwoActBuildCnt = appTwoBuildCnt.get(); + + System.out.println("appOne: actBuildCnt: " + appOneActBuildCnt + " expBuildCnt: "+ appOneExpectedBuildCnt); + System.out.println("appTwo: actBuildCnt: " + appTwoActBuildCnt + " expBuildCnt: "+ appTwoExpectedBuildCnt); + + assertTrue("appOne", + appOneActBuildCnt > 1 + && appOneActBuildCnt >= appOneExpectedBuildCnt - 1 + && appOneActBuildCnt <= appOneExpectedBuildCnt); + + assertTrue("appTwo", + appTwoActBuildCnt > 1 + && appTwoActBuildCnt >= appTwoExpectedBuildCnt - 1 + && appTwoActBuildCnt <= appTwoExpectedBuildCnt); } static void startProvider(DirectProvider provider) @@ -81,9 +110,11 @@ public class JobMonitorAppTest { /** * Fails every 2 seconds (20 tuples * 100 millis) */ - static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter) { + static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) { ApplicationService appService = submitter.getServices().getService(ApplicationService.class); appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> { + + topoBuiltCnt.incrementAndGet(); Random r = new Random(); TStream<Double> d = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS); @@ -92,7 +123,8 @@ public class JobMonitorAppTest { d = d.filter(tuple -> { int tupleCount = count.incrementAndGet(); if (tupleCount == 20) { - throw new IllegalStateException("Injected error"); + injectedErrorCnt.incrementAndGet(); + throw new IllegalStateException(MONITORED_APP_NAME_1 + " Injected error " + injectedErrorCnt.get()); } return true; }); @@ -104,9 +136,11 @@ public class JobMonitorAppTest { /** * Fails every 1.5 seconds (10 tuples * 150 millis) */ - static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter) { + static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) { ApplicationService appService = submitter.getServices().getService(ApplicationService.class); appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> { + + topoBuiltCnt.incrementAndGet(); Random r = new Random(); TStream<Double> d = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS); @@ -115,7 +149,8 @@ public class JobMonitorAppTest { d = d.filter(tuple -> { int tupleCount = count.incrementAndGet(); if (tupleCount == 10) { - throw new IllegalStateException("Injected error"); + injectedErrorCnt.incrementAndGet(); + throw new IllegalStateException(MONITORED_APP_NAME_2 + " Injected error " + injectedErrorCnt.get()); } return true; });