This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 95b8081 [ZEPPELIN-4614]. Dead lock in ZeppelinServer 95b8081 is described below commit 95b808144e25926c89725e48741c6148ee0e34ef Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Feb 14 09:56:37 2020 +0800 [ZEPPELIN-4614]. Dead lock in ZeppelinServer ### What is this PR for? This PR is to fix the dead lock issue in ZeppelinServer. The deadlock happens in Paragraph (See jira for jstack details). Besides that I add more logging in this PR. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4614 ### How should this be tested? * I have run 2 cron jobs for more than 2 days (run it for each 5 minutes), and no deadlock happens again ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3652 from zjffdu/ZEPPELIN-4614 and squashes the following commits: 76f1d1706 [Jeff Zhang] [ZEPPELIN-4614]. Dead lock in ZeppelinServer --- .../java/org/apache/zeppelin/scheduler/Job.java | 8 ++--- .../zeppelin/interpreter/InterpreterSetting.java | 2 +- .../java/org/apache/zeppelin/notebook/Note.java | 22 ++++++++++++-- .../zeppelin/notebook/scheduler/CronJob.java | 35 ++++++++++------------ .../notebook/scheduler/QuartzSchedulerService.java | 19 ++++++++---- 5 files changed, 54 insertions(+), 32 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index c36e371..66bfefa 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -178,13 +178,13 @@ public abstract class Job<T> { } } - private synchronized void completeWithSuccess(T result) { + private void completeWithSuccess(T result) { setResult(result); exception = null; errorMessage = null; } - private synchronized void completeWithError(Throwable error) { + private void completeWithError(Throwable error) { setException(error); errorMessage = getJobExceptionStack(error); } @@ -201,11 +201,11 @@ public abstract class Job<T> { } } - public synchronized Throwable getException() { + public Throwable getException() { return exception; } - protected synchronized void setException(Throwable t) { + protected void setException(Throwable t) { exception = t; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 156bcb6..632937d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -885,7 +885,7 @@ public class InterpreterSetting { // load dependencies List<Dependency> deps = getDependencies(); - if (deps != null) { + if (deps != null && !deps.isEmpty()) { LOGGER.info("Start to download dependencies for interpreter: " + name); for (Dependency d : deps) { File destDir = new File( diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 966f5ba..ba75a67 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -911,9 +911,7 @@ public class Note implements JsonSerializable { } public List<Paragraph> getParagraphs() { - synchronized (paragraphs) { - return new LinkedList<>(paragraphs); - } + return this.paragraphs; } // TODO(zjffdu) how does this used ? @@ -1006,6 +1004,24 @@ public class Note implements JsonSerializable { } /** + * Get InterpreterSetting used by the paragraphs of this note. + * @return + */ + public List<InterpreterSetting> getUsedInterpreterSettings() { + Set<InterpreterSetting> settings = new HashSet<>(); + for (Paragraph p : getParagraphs()) { + try { + Interpreter intp = p.getBindedInterpreter(); + settings.add(( + (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting()); + } catch (InterpreterNotFoundException e) { + // ignore this + } + } + return new ArrayList<>(settings); + } + + /** * Return new note for specific user. this inserts and replaces user paragraph which doesn't * exists in original paragraph * diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index bf15d8b..45133f9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -33,28 +33,27 @@ import org.slf4j.LoggerFactory; /** Cron task for the note. */ public class CronJob implements org.quartz.Job { - private static final Logger logger = LoggerFactory.getLogger(CronJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CronJob.class); @Override public void execute(JobExecutionContext context) { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); - Notebook notebook = (Notebook) jobDataMap.get("notebook"); String noteId = jobDataMap.getString("noteId"); - logger.info("Start cron job of note: " + noteId); + LOGGER.info("Start cron job of note: " + noteId); Note note = null; try { note = notebook.getNote(noteId); if (note == null) { - logger.warn("Note " + noteId + " not found"); + LOGGER.warn("Skip cron job of note: " + noteId + ", because it is not found"); return; } } catch (IOException e) { - logger.warn("Fail to get note: " + noteId, e); + LOGGER.warn("Skip cron job of note: " + noteId + ", because fail to get it", e); return; } if (note.haveRunningOrPendingParagraphs()) { - logger.warn( + LOGGER.warn( "execution of the cron job is skipped because there is a running or pending " + "paragraph (note id: {})", noteId); @@ -62,7 +61,7 @@ public class CronJob implements org.quartz.Job { } if (!note.isCronSupported(notebook.getConf())) { - logger.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); return; } @@ -70,19 +69,17 @@ public class CronJob implements org.quartz.Job { boolean releaseResource = false; String cronExecutingUser = null; - try { - Map<String, Object> config = note.getConfig(); - if (config != null) { - if (config.containsKey("releaseresource")) { - releaseResource = (boolean) config.get("releaseresource"); - } - cronExecutingUser = (String) config.get("cronExecutingUser"); + Map<String, Object> config = note.getConfig(); + if (config != null) { + if (config.containsKey("releaseresource")) { + releaseResource = (boolean) config.get("releaseresource"); } - } catch (ClassCastException e) { - logger.error(e.getMessage(), e); + cronExecutingUser = (String) config.get("cronExecutingUser"); } + if (releaseResource) { - for (InterpreterSetting setting : note.getBindedInterpreterSettings()) { + LOGGER.info("Releasing interpreters used by this note: " + noteId); + for (InterpreterSetting setting : note.getUsedInterpreterSettings()) { try { notebook .getInterpreterSettingManager() @@ -91,7 +88,7 @@ public class CronJob implements org.quartz.Job { noteId, cronExecutingUser != null ? cronExecutingUser : "anonymous"); } catch (InterpreterException e) { - logger.error("Fail to restart interpreter: " + setting.getId(), e); + LOGGER.error("Fail to restart interpreter: " + setting.getId(), e); } } } @@ -111,7 +108,7 @@ public class CronJob implements org.quartz.Job { try { note.runAll(authenticationInfo, true); } catch (Exception e) { - logger.warn("Fail to run note", e); + LOGGER.warn("Fail to run note: " + note.getName(), e); } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java index bceeb9c..cf1c0a3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java @@ -76,24 +76,32 @@ public class QuartzSchedulerService implements SchedulerService { try { note = notebook.getNote(noteId); } catch (IOException e) { - LOGGER.warn("Fail to get note: " + noteId, e); + LOGGER.warn("Skip refresh cron of note: " + noteId + " because fail to get it", e); return; } - if (note == null || note.isTrash()) { + if (note == null) { + LOGGER.warn("Skip refresh cron of note: " + noteId + " because there's no such note"); return; } + if (note.isTrash()) { + LOGGER.warn("Skip refresh cron of note: " + noteId + " because it is in trash"); + return; + } + Map<String, Object> config = note.getConfig(); if (config == null) { + LOGGER.warn("Skip refresh cron of note: " + noteId + " because its config is empty."); return; } if (!note.isCronSupported(zeppelinConfiguration)) { - LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + LOGGER.warn("Skip refresh cron of note " + noteId + " because its cron is not enabled."); return; } String cronExpr = (String) note.getConfig().get("cron"); if (cronExpr == null || cronExpr.trim().length() == 0) { + LOGGER.warn("Skip refresh cron of note " + noteId + " because its cron expression is empty."); return; } @@ -122,16 +130,17 @@ public class QuartzSchedulerService implements SchedulerService { .forJob(noteId, "note") .build(); } catch (Exception e) { - LOGGER.error("Error", e); + LOGGER.error("Fail to create cron trigger for note: " + note.getName(), e); info.put("cron", e.getMessage()); } try { if (trigger != null) { + LOGGER.info("Trigger cron for note: " + note.getName() + ", with cron expression: " + cronExpr); scheduler.scheduleJob(newJob, trigger); } } catch (SchedulerException e) { - LOGGER.error("Error", e); + LOGGER.error("Fail to schedule cron job for note: " + note.getName(), e); info.put("cron", "Scheduler Exception"); } }