This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new debfa7fcc2 [ZEPPELIN-5900] Fix RemoteSchedulerTest (#4582)
debfa7fcc2 is described below

commit debfa7fcc23d6f5ea2f0664308cdbe7eba78b419
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Tue May 2 08:37:44 2023 +0200

    [ZEPPELIN-5900] Fix RemoteSchedulerTest (#4582)
---
 .../zeppelin/scheduler/AbstractScheduler.java      | 24 +++---
 .../java/org/apache/zeppelin/scheduler/Job.java    |  8 ++
 .../interpreter/remote/RemoteInterpreter.java      | 14 ++--
 .../apache/zeppelin/scheduler/RemoteScheduler.java | 85 ++++++++++++----------
 .../zeppelin/scheduler/RemoteSchedulerTest.java    |  1 +
 5 files changed, 72 insertions(+), 60 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index fdd456be18..0f95088df3 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -36,7 +36,7 @@ public abstract class AbstractScheduler implements Scheduler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractScheduler.class);
 
-  protected String name;
+  protected final String name;
   protected volatile boolean terminate = false;
   protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>();
   protected Map<String, Job> jobs = new ConcurrentHashMap<>();
@@ -89,11 +89,14 @@ public abstract class AbstractScheduler implements 
Scheduler {
         runningJob = queue.take();
       } catch (InterruptedException e) {
         LOGGER.warn("{} is interrupted", getClass().getSimpleName());
+        // Restore interrupted state...
+        Thread.currentThread().interrupt();
         break;
       }
 
       runJobInScheduler(runningJob);
     }
+    stop();
   }
 
   public abstract void runJobInScheduler(Job job);
@@ -124,7 +127,7 @@ public abstract class AbstractScheduler implements 
Scheduler {
       return;
     }
 
-    LOGGER.info("Job {} started by scheduler {}",runningJob.getId(), name);
+    LOGGER.info("Job {} started by scheduler {}", runningJob.getId(), name);
     // Don't set RUNNING status when it is RemoteScheduler, update it via 
JobStatusPoller
     if (!getClass().getSimpleName().equals("RemoteScheduler")) {
       runningJob.setStatus(Job.Status.RUNNING);
@@ -133,21 +136,14 @@ public abstract class AbstractScheduler implements 
Scheduler {
     Object jobResult = runningJob.getReturn();
     synchronized (runningJob) {
       if (runningJob.isAborted()) {
+        LOGGER.debug("Job Aborted, {}, {}", runningJob.getId(), 
runningJob.getErrorMessage());
         runningJob.setStatus(Job.Status.ABORT);
-        LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " +
-                runningJob.getErrorMessage());
-      } else if (runningJob.getException() != null) {
-        LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-                runningJob.getReturn());
-        runningJob.setStatus(Job.Status.ERROR);
-      } else if (jobResult != null && jobResult instanceof InterpreterResult
-              && ((InterpreterResult) jobResult).code() == 
InterpreterResult.Code.ERROR) {
-        LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-                runningJob.getReturn());
+      } else if (runningJob.getException() != null || (jobResult instanceof 
InterpreterResult
+          && ((InterpreterResult) jobResult).code() == 
InterpreterResult.Code.ERROR)) {
+        LOGGER.debug("Job Error, {}, {}", runningJob.getId(), 
runningJob.getReturn());
         runningJob.setStatus(Job.Status.ERROR);
       } else {
-        LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " +
-                runningJob.getReturn());
+        LOGGER.debug("Job Finished, {}, Result: {}", runningJob.getId(), 
runningJob.getReturn());
         runningJob.setStatus(Job.Status.FINISHED);
       }
     }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 7d06d490c9..55d76d8e94 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -70,6 +70,14 @@ public abstract class Job<T> {
     public boolean isCompleted() {
       return this == FINISHED || this == ERROR || this == ABORT;
     }
+
+    public boolean isAbort() {
+      return this == ABORT;
+    }
+
+    public boolean isFailed() {
+      return this == ERROR || this == ABORT;
+    }
   }
 
   private String jobName;
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 02f1b624ba..ba3be51048 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -30,7 +30,6 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.LifecycleManager;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
@@ -342,17 +341,14 @@ public class RemoteInterpreter extends Interpreter {
     // running under the scheduler of this session will be aborted.
     String executionMode = getProperty(".execution.mode", "paragraph");
     if (executionMode.equals("paragraph")) {
-      Scheduler s = new RemoteScheduler(
-              RemoteInterpreter.class.getSimpleName() + "-" + 
getInterpreterGroup().getId() + "-" + sessionId,
-              SchedulerFactory.singleton().getExecutor(),
-              this);
+      String name = RemoteInterpreter.class.getSimpleName() + "-" + 
getInterpreterGroup().getId()
+          + "-" + sessionId;
+      Scheduler s = new RemoteScheduler(name, this);
       return SchedulerFactory.singleton().createOrGetScheduler(s);
     } else if (executionMode.equals("note")) {
       String noteId = getProperty(".noteId");
-      Scheduler s = new RemoteScheduler(
-              RemoteInterpreter.class.getSimpleName() + "-" + noteId,
-              SchedulerFactory.singleton().getExecutor(),
-              this);
+      String name = RemoteInterpreter.class.getSimpleName() + "-" + noteId;
+      Scheduler s = new RemoteScheduler(name, this);
       return SchedulerFactory.singleton().createOrGetScheduler(s);
     } else {
       throw new RuntimeException("Invalid execution mode: " + executionMode);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index d8797fff89..8c068f9f60 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -24,7 +24,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on 
RemoteInterpreter.
@@ -34,14 +36,14 @@ import java.util.concurrent.TimeUnit;
 public class RemoteScheduler extends AbstractScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteScheduler.class);
 
-  private RemoteInterpreter remoteInterpreter;
-  private ExecutorService executor;
+  private final RemoteInterpreter remoteInterpreter;
+  private final ExecutorService executor;
 
   public RemoteScheduler(String name,
-                         ExecutorService executor,
                          RemoteInterpreter remoteInterpreter) {
     super(name);
-    this.executor = executor;
+    this.executor =
+        Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" + 
name + "-"));
     this.remoteInterpreter = remoteInterpreter;
   }
 
@@ -53,22 +55,26 @@ public class RemoteScheduler extends AbstractScheduler {
             remoteInterpreter.getProperty(".execution.mode", "paragraph");
     if (executionMode.equals("paragraph")) {
       // wait until it is submitted to the remote
-      while (!jobRunner.isJobSubmittedInRemote()) {
+      while (!jobRunner.isJobSubmittedInRemote() && 
!Thread.currentThread().isInterrupted()) {
         try {
           Thread.sleep(100);
         } catch (InterruptedException e) {
           LOGGER.error("Exception in RemoteScheduler while 
jobRunner.isJobSubmittedInRemote " +
                   "queue.wait", e);
+          // Restore interrupted state...
+          Thread.currentThread().interrupt();
         }
       }
     } else if (executionMode.equals("note")){
       // wait until it is finished
-      while (!jobRunner.isJobExecuted()) {
+      while (!jobRunner.isJobExecuted() && 
!Thread.currentThread().isInterrupted()) {
         try {
           Thread.sleep(100);
         } catch (InterruptedException e) {
           LOGGER.error("Exception in RemoteScheduler while 
jobRunner.isJobExecuted " +
                   "queue.wait", e);
+          // Restore interrupted state...
+          Thread.currentThread().interrupt();
         }
       }
     } else {
@@ -82,10 +88,11 @@ public class RemoteScheduler extends AbstractScheduler {
    * RUNNING status. This thread will exist after job is in RUNNING/FINISHED 
state.
    */
   private class JobStatusPoller extends Thread {
-    private long checkIntervalMsec;
-    private volatile boolean terminate;
-    private JobListener listener;
-    private Job job;
+    private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
+    private final long checkIntervalMsec;
+    private final AtomicBoolean terminate;
+    private final JobListener listener;
+    private final Job job;
     private volatile Status lastStatus;
 
     public JobStatusPoller(Job job,
@@ -95,41 +102,38 @@ public class RemoteScheduler extends AbstractScheduler {
       this.checkIntervalMsec = checkIntervalMsec;
       this.job = job;
       this.listener = listener;
-      this.terminate = false;
+      this.terminate = new AtomicBoolean(false);
     }
 
     @Override
     public void run() {
-      while (terminate == false) {
-        synchronized (this) {
-          try {
-            this.wait(checkIntervalMsec);
-          } catch (InterruptedException e) {
-            LOGGER.error("Exception in RemoteScheduler while run this.wait", 
e);
-          }
-        }
-
-        if (terminate) {
-          // terminated by shutdown
-          break;
-        }
-
+      while (!terminate.get() && !Thread.currentThread().isInterrupted()) {
         Status newStatus = getStatus();
         if (newStatus == Status.RUNNING ||
                 newStatus == Status.FINISHED ||
                 newStatus == Status.ERROR ||
                 newStatus == Status.ABORT) {
           // Exit this thread when job is in RUNNING/FINISHED/ERROR/ABORT 
state.
-          break;
+          terminate.set(true);
+        } else {
+          synchronized (terminate) {
+            try {
+              terminate.wait(checkIntervalMsec);
+            } catch (InterruptedException e) {
+              logger.error("Exception in RemoteScheduler while run this.wait", 
e);
+              // Restore interrupted state...
+              Thread.currentThread().interrupt();
+            }
+          }
         }
       }
-      terminate = true;
+      terminate.set(true);
     }
 
     public void shutdown() {
-      terminate = true;
-      synchronized (this) {
-        this.notify();
+      synchronized (terminate) {
+        terminate.set(true);
+        terminate.notifyAll();
       }
     }
 
@@ -155,8 +159,8 @@ public class RemoteScheduler extends AbstractScheduler {
 
   private class JobRunner implements Runnable, JobListener {
     private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
-    private RemoteScheduler scheduler;
-    private Job job;
+    private final RemoteScheduler scheduler;
+    private final Job job;
     private volatile boolean jobExecuted;
     private volatile boolean jobSubmittedRemotely;
 
@@ -187,6 +191,8 @@ public class RemoteScheduler extends AbstractScheduler {
         jobStatusPoller.join();
       } catch (InterruptedException e) {
         logger.error("JobStatusPoller interrupted", e);
+        // Restore interrupted state...
+        Thread.currentThread().interrupt();
       }
     }
 
@@ -197,7 +203,11 @@ public class RemoteScheduler extends AbstractScheduler {
     // Call by JobStatusPoller thread, update status when JobStatusPoller get 
new status.
     @Override
     public void onStatusChange(Job job, Status before, Status after) {
-      if (jobExecuted == false) {
+      if (!job.equals(this.job)) {
+        logger.error("StatusChange for an unkown job. {} != {}", 
this.job.getId(), job.getId());
+        return;
+      }
+      if (!jobExecuted) {
         if (after == Status.FINISHED || after == Status.ABORT
                 || after == Status.ERROR) {
           // it can be status of last run.
@@ -205,7 +215,7 @@ public class RemoteScheduler extends AbstractScheduler {
           return;
         } else if (after == Status.RUNNING) {
           jobSubmittedRemotely = true;
-          job.setStatus(Status.RUNNING);
+          this.job.setStatus(Status.RUNNING);
         }
       } else {
         jobSubmittedRemotely = true;
@@ -214,9 +224,9 @@ public class RemoteScheduler extends AbstractScheduler {
       // only set status when the status fetched from JobStatusPoller is 
RUNNING,
       // the status of job itself is still in PENDING.
       // Because the status from JobStatusPoller may happen after the job is 
finished.
-      synchronized (job) {
-        if (after == Status.RUNNING && job.getStatus() == Status.PENDING) {
-          job.setStatus(Status.RUNNING);
+      synchronized (this.job) {
+        if (after == Status.RUNNING && this.job.getStatus() == Status.PENDING) 
{
+          this.job.setStatus(Status.RUNNING);
         }
       }
     }
@@ -225,6 +235,7 @@ public class RemoteScheduler extends AbstractScheduler {
   @Override
   public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) {
     super.stop();
+    ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit);
   }
 
 }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 5240ab906c..3aee8a9c72 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -260,6 +260,7 @@ public class RemoteSchedulerTest extends 
AbstractInterpreterTest
 
     assertNotNull(job1.getDateFinished());
     assertTrue(job1.isTerminated());
+    assertEquals("1000", job1.getReturn());
     assertNull(job2.getDateFinished());
     assertTrue(job2.isTerminated());
     assertEquals("result2", job2.getReturn());

Reply via email to