This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 9bd81e2 [ZEPPELIN-4613]. Scheduler thread is not closed properly 9bd81e2 is described below commit 9bd81e238c9539a6d9f9b94b7ac5261af9644acc Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Feb 13 10:45:01 2020 +0800 [ZEPPELIN-4613]. Scheduler thread is not closed properly ### What is this PR for? The root cause is that we didn't shutdown the scheduler thread properly. It is always stuck in the following line ``` runningJob = queue.take(); ``` This PR fix this issue by interrupt that thread. Besidies that I also add more logging in this PR. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4613 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3644 from zjffdu/ZEPPELIN-4613 and squashes the following commits: 2d50bd2a9 [Jeff Zhang] [ZEPPELIN-4613]. Scheduler thread is not closed properly --- .../org/apache/zeppelin/scheduler/AbstractScheduler.java | 15 ++++++++++++--- .../main/java/org/apache/zeppelin/notebook/Paragraph.java | 7 +++++++ .../org/apache/zeppelin/notebook/scheduler/CronJob.java | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index 175dc6a..d6c2352 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -40,7 +40,7 @@ public abstract class AbstractScheduler implements Scheduler { protected volatile boolean terminate = false; protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>(); protected Map<String, Job> jobs = new ConcurrentHashMap<>(); - + private Thread schedulerThread; public AbstractScheduler(String name) { this.name = name; @@ -63,7 +63,11 @@ public abstract class AbstractScheduler implements Scheduler { @Override public void submit(Job job) { job.setStatus(Job.Status.PENDING); - queue.add(job); + try { + queue.put(job); + } catch (InterruptedException e) { + throw new RuntimeException(String.format("Unable to submit job %s", job.getId()), e); + } jobs.put(job.getId(), job); } @@ -76,7 +80,8 @@ public abstract class AbstractScheduler implements Scheduler { @Override public void run() { - while (!terminate && !Thread.currentThread().isInterrupted()) { + schedulerThread = Thread.currentThread(); + while (!terminate && !schedulerThread.isInterrupted()) { Job runningJob = null; try { runningJob = queue.take(); @@ -98,6 +103,9 @@ public abstract class AbstractScheduler implements Scheduler { job.aborted = true; job.jobAbort(); } + if (schedulerThread != null) { + schedulerThread.interrupt(); + } } /** @@ -108,6 +116,7 @@ public abstract class AbstractScheduler implements Scheduler { */ protected void runJob(Job runningJob) { if (runningJob.isAborted()) { + LOGGER.info("Job {} is aborted", runningJob.getId()); runningJob.setStatus(Job.Status.ABORT); runningJob.aborted = false; return; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index e74a269..803e600 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -385,6 +385,13 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen setReturn(intpResult, e); setStatus(Job.Status.ERROR); return false; + } catch (Throwable e) { + InterpreterResult intpResult = + new InterpreterResult(InterpreterResult.Code.ERROR, + "Unexpected exception: " + ExceptionUtils.getStackTrace(e)); + setReturn(intpResult, e); + setStatus(Job.Status.ERROR); + return false; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index 1d2c3ff..bf15d8b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -41,6 +41,7 @@ public class CronJob implements org.quartz.Job { Notebook notebook = (Notebook) jobDataMap.get("notebook"); String noteId = jobDataMap.getString("noteId"); + logger.info("Start cron job of note: " + noteId); Note note = null; try { note = notebook.getNote(noteId);