Repository: incubator-edgent
Updated Branches:
  refs/heads/master a39e92054 -> 14c7989b7


[Edgent-396] JobMonitorApp restarts job 3x more than it should

- instrument/validate JobMonitorAppTest
- fix JobMonitorApp event filtering

Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/14c7989b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/14c7989b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/14c7989b

Branch: refs/heads/master
Commit: 14c7989b74edc6ff179053fe8ae3e188ce993176
Parents: a39e920
Author: Dale LaBossiere <dlab...@us.ibm.com>
Authored: Tue Mar 14 15:54:39 2017 -0400
Committer: Dale LaBossiere <dlab...@us.ibm.com>
Committed: Tue Mar 14 15:54:39 2017 -0400

----------------------------------------------------------------------
 .../edgent/apps/runtime/JobMonitorApp.java      | 14 +++++-
 .../test/apps/runtime/JobMonitorAppTest.java    | 47 +++++++++++++++++---
 2 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/14c7989b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git 
a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java 
b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
index 883b023..2a90cc1 100644
--- 
a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
+++ 
b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
@@ -216,9 +216,19 @@ public class JobMonitorApp {
                     logger.trace("Filter: {}", value);
 
                     try {
+                        // Only trigger on the initial unhealthy event:
+                        //     state:RUNNING nextState:RUNNING UNHEALTHY
+                        // Closing the UNHEALTHY job then results in 
additional UNHEALTHY events
+                        // that we need to ignore:
+                        //     RUNNING, CLOSED, UNHEALTHY
+                        //     CLOSED, CLOSED, UNHEALTHY
                         JsonObject job = JobMonitorAppEvent.getJob(value);
                         return (Job.Health.UNHEALTHY.name().equals(
-                                JobMonitorAppEvent.getJobHealth(job)));
+                                JobMonitorAppEvent.getJobHealth(job))
+                            && Job.State.RUNNING.name().equals(
+                                JobMonitorAppEvent.getProperty(job, "state"))
+                            && Job.State.RUNNING.name().equals(
+                                JobMonitorAppEvent.getProperty(job, 
"nextState")));
                     } catch (IllegalArgumentException e) {
                         logger.info("Invalid event filtered out, cause: {}", 
e.getMessage());
                         return false;
@@ -246,6 +256,8 @@ public class JobMonitorApp {
             JsonObject job = JobMonitorAppEvent.getJob(value);
             String applicationName = JobMonitorAppEvent.getJobName(job);
 
+            logger.trace("close and restart: {}", value);
+            
             closeJob(applicationName, controlService);
             submitApplication(applicationName, controlService);
         }

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/14c7989b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
----------------------------------------------------------------------
diff --git 
a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
 
b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
index 51b129c..d109b0a 100644
--- 
a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
+++ 
b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
@@ -55,8 +55,12 @@ public class JobMonitorAppTest {
         Job monitor = app.submit();
 
         // Declare and register user apps which need monitoring
-        registerMonitoredApplicationOne(provider);
-        registerMonitoredApplicationTwo(provider);
+        AtomicInteger appOneBuildCnt = new AtomicInteger();
+        AtomicInteger appOneInjectedErrorCnt = new AtomicInteger();
+        AtomicInteger appTwoBuildCnt = new AtomicInteger();
+        AtomicInteger appTwoInjectedErrorCnt = new AtomicInteger();
+        registerMonitoredApplicationOne(provider, appOneBuildCnt, 
appOneInjectedErrorCnt);
+        registerMonitoredApplicationTwo(provider, appTwoBuildCnt, 
appTwoInjectedErrorCnt);
 
         // Start monitored apps
         startMonitoredApplications(provider);
@@ -67,6 +71,31 @@ public class JobMonitorAppTest {
                 Job.State.RUNNING.equals(monitor.getCurrentState()) &&
                 Job.State.RUNNING.equals(monitor.getNextState()) &&
                 Job.Health.HEALTHY.equals(monitor.getHealth()));
+        
+        // Verify app restarts.
+        // Ideally, each app should be started (rebuilt) exactly 
+        // once initially + once for each injected failure.
+        // Given the timing vagueness of this test allow for
+        // a little wiggle room - allow one fewer than expected.
+        
+        int appOneExpectedBuildCnt = 1 + appOneInjectedErrorCnt.get();
+        int appTwoExpectedBuildCnt = 1 + appTwoInjectedErrorCnt.get();
+        
+        int appOneActBuildCnt = appOneBuildCnt.get();
+        int appTwoActBuildCnt = appTwoBuildCnt.get();
+
+        System.out.println("appOne: actBuildCnt: " + appOneActBuildCnt + " 
expBuildCnt: "+ appOneExpectedBuildCnt);
+        System.out.println("appTwo: actBuildCnt: " + appTwoActBuildCnt + " 
expBuildCnt: "+ appTwoExpectedBuildCnt);
+        
+        assertTrue("appOne", 
+            appOneActBuildCnt > 1
+            && appOneActBuildCnt >= appOneExpectedBuildCnt - 1
+            && appOneActBuildCnt <= appOneExpectedBuildCnt);
+        
+        assertTrue("appTwo", 
+            appTwoActBuildCnt > 1
+            && appTwoActBuildCnt >= appTwoExpectedBuildCnt - 1
+            && appTwoActBuildCnt <= appTwoExpectedBuildCnt);
     }
 
     static void startProvider(DirectProvider provider) 
@@ -81,9 +110,11 @@ public class JobMonitorAppTest {
     /**
      * Fails every 2 seconds (20 tuples * 100 millis)
      */
-    static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> 
submitter) {
+    static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> 
submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
         ApplicationService appService = 
submitter.getServices().getService(ApplicationService.class);
         appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) 
-> {
+          
+                topoBuiltCnt.incrementAndGet();
                 
                 Random r = new Random();
                 TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 
100, TimeUnit.MILLISECONDS);
@@ -92,7 +123,8 @@ public class JobMonitorAppTest {
                 d = d.filter(tuple -> {
                     int tupleCount = count.incrementAndGet();
                     if (tupleCount == 20) {
-                        throw new IllegalStateException("Injected error");
+                        injectedErrorCnt.incrementAndGet();
+                        throw new IllegalStateException(MONITORED_APP_NAME_1 + 
" Injected error " + injectedErrorCnt.get());
                     }
                     return true; 
                 });
@@ -104,9 +136,11 @@ public class JobMonitorAppTest {
     /**
      * Fails every 1.5 seconds (10 tuples * 150 millis)
      */
-    static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> 
submitter) {
+    static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> 
submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
         ApplicationService appService = 
submitter.getServices().getService(ApplicationService.class);
         appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) 
-> {
+          
+                topoBuiltCnt.incrementAndGet();
                 
                 Random r = new Random();
                 TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 
150, TimeUnit.MILLISECONDS);
@@ -115,7 +149,8 @@ public class JobMonitorAppTest {
                 d = d.filter(tuple -> {
                     int tupleCount = count.incrementAndGet();
                     if (tupleCount == 10) {
-                        throw new IllegalStateException("Injected error");
+                        injectedErrorCnt.incrementAndGet();
+                        throw new IllegalStateException(MONITORED_APP_NAME_2 + 
" Injected error " + injectedErrorCnt.get());
                     }
                     return true; 
                 });

Reply via email to