This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd81e2  [ZEPPELIN-4613]. Scheduler thread is not closed properly
9bd81e2 is described below

commit 9bd81e238c9539a6d9f9b94b7ac5261af9644acc
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Feb 13 10:45:01 2020 +0800

    [ZEPPELIN-4613]. Scheduler thread is not closed properly
    
    ### What is this PR for?
    The root cause is that we didn't shutdown the scheduler thread properly. It 
is always stuck in the following line
    ```
    runningJob = queue.take();
    ```
    
    This PR fix this issue by interrupt that thread. Besidies that I also add 
more logging in this PR.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4613
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3644 from zjffdu/ZEPPELIN-4613 and squashes the following commits:
    
    2d50bd2a9 [Jeff Zhang] [ZEPPELIN-4613]. Scheduler thread is not closed 
properly
---
 .../org/apache/zeppelin/scheduler/AbstractScheduler.java  | 15 ++++++++++++---
 .../main/java/org/apache/zeppelin/notebook/Paragraph.java |  7 +++++++
 .../org/apache/zeppelin/notebook/scheduler/CronJob.java   |  1 +
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index 175dc6a..d6c2352 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -40,7 +40,7 @@ public abstract class AbstractScheduler implements Scheduler {
   protected volatile boolean terminate = false;
   protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>();
   protected Map<String, Job> jobs = new ConcurrentHashMap<>();
-
+  private Thread schedulerThread;
 
   public AbstractScheduler(String name) {
     this.name = name;
@@ -63,7 +63,11 @@ public abstract class AbstractScheduler implements Scheduler 
{
   @Override
   public void submit(Job job) {
     job.setStatus(Job.Status.PENDING);
-    queue.add(job);
+    try {
+      queue.put(job);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(String.format("Unable to submit job %s", 
job.getId()), e);
+    }
     jobs.put(job.getId(), job);
   }
 
@@ -76,7 +80,8 @@ public abstract class AbstractScheduler implements Scheduler {
 
   @Override
   public void run() {
-    while (!terminate && !Thread.currentThread().isInterrupted()) {
+    schedulerThread = Thread.currentThread();
+    while (!terminate && !schedulerThread.isInterrupted()) {
       Job runningJob = null;
       try {
         runningJob = queue.take();
@@ -98,6 +103,9 @@ public abstract class AbstractScheduler implements Scheduler 
{
       job.aborted = true;
       job.jobAbort();
     }
+    if (schedulerThread != null) {
+      schedulerThread.interrupt();
+    }
   }
 
   /**
@@ -108,6 +116,7 @@ public abstract class AbstractScheduler implements 
Scheduler {
    */
   protected void runJob(Job runningJob) {
     if (runningJob.isAborted()) {
+      LOGGER.info("Job {} is aborted", runningJob.getId());
       runningJob.setStatus(Job.Status.ABORT);
       runningJob.aborted = false;
       return;
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index e74a269..803e600 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -385,6 +385,13 @@ public class Paragraph extends 
JobWithProgressPoller<InterpreterResult> implemen
       setReturn(intpResult, e);
       setStatus(Job.Status.ERROR);
       return false;
+    } catch (Throwable e) {
+      InterpreterResult intpResult =
+              new InterpreterResult(InterpreterResult.Code.ERROR,
+                      "Unexpected exception: " + 
ExceptionUtils.getStackTrace(e));
+      setReturn(intpResult, e);
+      setStatus(Job.Status.ERROR);
+      return false;
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
index 1d2c3ff..bf15d8b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
@@ -41,6 +41,7 @@ public class CronJob implements org.quartz.Job {
 
     Notebook notebook = (Notebook) jobDataMap.get("notebook");
     String noteId = jobDataMap.getString("noteId");
+    logger.info("Start cron job of note: " + noteId);
     Note note = null;
     try {
       note = notebook.getNote(noteId);

Reply via email to