This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 8ddbd90868 [ZEPPELIN-5754] Init Notes in background (#4382) 8ddbd90868 is described below commit 8ddbd908687b826722673139f37fd2dde2c95ce5 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Jul 6 09:35:55 2022 +0200 [ZEPPELIN-5754] Init Notes in background (#4382) * Remove generic wildcard type in SchedulerService * Use a TriggerListener to check for running or pending paragraphs * Init-Jobs in background * Be safe when initNotebooks is called multiple times --- .../org/apache/zeppelin/util/ExecutorUtil.java | 4 +- .../org/apache/zeppelin/server/ZeppelinServer.java | 2 + .../apache/zeppelin/cluster/ClusterEventTest.java | 5 +- .../zeppelin/service/NotebookServiceTest.java | 4 +- .../org/apache/zeppelin/notebook/Notebook.java | 75 ++++++++- .../zeppelin/notebook/scheduler/CronJob.java | 9 -- ...JobListener.java => MetricCronJobListener.java} | 10 +- .../notebook/scheduler/NoSchedulerService.java | 7 +- .../notebook/scheduler/QuartzSchedulerService.java | 170 +++++++++------------ .../notebook/scheduler/SchedulerService.java | 16 +- .../scheduler/ZeppelinCronJobTriggerListerner.java | 61 ++++++++ .../org/apache/zeppelin/search/LuceneSearch.java | 20 +-- .../org/apache/zeppelin/notebook/NotebookTest.java | 7 +- 13 files changed, 236 insertions(+), 154 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java index 1e8187781f..27ba854a81 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.util; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -38,7 +39,8 @@ public class ExecutorUtil { // Wait a while for existing tasks to terminate if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) { LOGGER.warn("{} was not shut down in the given time {} {} - interrupting now", name, stopTimeoutVal, stopTimeoutUnit); - executor.shutdownNow(); // Cancel currently executing tasks + List<Runnable> neverExecuted = executor.shutdownNow(); // Cancel currently executing tasks + LOGGER.warn("{} tasks were never executed from {}.", neverExecuted.size(), name); // Wait a while for tasks to respond to being cancelled if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) { LOGGER.error("executor {} did not terminate", name); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 7cb97a45a6..ae6846d554 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -260,6 +260,8 @@ public class ZeppelinServer extends ResourceConfig { sharedServiceLocator, SearchService.class.getName()); ServiceLocatorUtilities.getService( sharedServiceLocator, SchedulerService.class.getName()); + // Initialization of the Notes in the notebook asynchronously + notebook.initNotebook(); // Try to recover here, don't do it in constructor of Notebook, because it would cause deadlock. notebook.recoveryIfNecessary(); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java index cd16bf8cb7..22dfa434b7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java @@ -32,7 +32,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; import org.apache.zeppelin.interpreter.thrift.ServiceException; import org.apache.zeppelin.notebook.AuthorizationService; -import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; @@ -63,6 +62,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -101,7 +101,8 @@ public class ClusterEventTest extends ZeppelinServerMock { authorizationService = TestUtils.getInstance(AuthorizationService.class); schedulerService = new QuartzSchedulerService(zconf, notebook); - schedulerService.waitForFinishInit(); + notebook.initNotebook(); + notebook.waitForFinishInit(1, TimeUnit.MINUTES); notebookServer = spy(NotebookServer.getInstance()); notebookService = new NotebookService(notebook, authorizationService, zconf, schedulerService); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index 9e5489edb6..d6e81f18cf 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -39,6 +39,7 @@ import java.nio.file.Files; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -129,7 +130,8 @@ public class NotebookServiceTest { null); searchService = new LuceneSearch(zeppelinConfiguration, notebook); QuartzSchedulerService schedulerService = new QuartzSchedulerService(zeppelinConfiguration, notebook); - schedulerService.waitForFinishInit(); + notebook.initNotebook(); + notebook.waitForFinishInit(1, TimeUnit.MINUTES); notebookService = new NotebookService( notebook, authorizationService, zeppelinConfiguration, schedulerService); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index e2d913dc79..52bad06890 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -28,7 +28,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Function; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import javax.inject.Inject; @@ -51,8 +56,10 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.SchedulerThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; +import org.apache.zeppelin.util.ExecutorUtil; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +82,8 @@ public class Notebook { private NotebookRepo notebookRepo; private List<NoteEventListener> noteEventListeners = new CopyOnWriteArrayList<>(); private Credentials credentials; + private final List<Consumer<String>> initConsumers; + private ExecutorService initExecutor; /** * Main constructor \w manual Dependency Injection @@ -101,6 +110,7 @@ public class Notebook { this.interpreterSettingManager.setNotebook(this); this.credentials = credentials; addNotebookEventListener(this.interpreterSettingManager); + initConsumers = new LinkedList<>(); } public void recoveryIfNecessary() { @@ -109,6 +119,62 @@ public class Notebook { } } + /** + * Subsystems can add a consumer, which is executed during initialization. + * Initialization is performed in parallel and with multiple threads. + * + * The consumer must be thread safe. + * + * @param initConsumer Consumer, which is passed the NoteId. + */ + public void addInitConsumer(Consumer<String> initConsumer) { + this.initConsumers.add(initConsumer); + } + + /** + * Asynchronous and parallel initialization of notes (during startup) + */ + public void initNotebook() { + if (initExecutor == null || initExecutor.isShutdown() || initExecutor.isTerminated()) { + initExecutor = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), new SchedulerThreadFactory("NotebookInit")); + } + for (NoteInfo noteInfo : getNotesInfo()) { + initExecutor.execute(() -> { + for (Consumer<String> initConsumer : initConsumers) { + initConsumer.accept(noteInfo.getId()); + } + }); + } + } + + /** + * Blocks until all init jobs have completed execution, + * or the timeout occurs, or the current thread is + * interrupted, whichever happens first. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return {@code true} if this initJobs terminated or no initialization is active + * {@code false} if the timeout elapsed before termination or the wait is interrupted + */ + public boolean waitForFinishInit(long timeout, TimeUnit unit) { + if (initExecutor == null) { + return true; + } + try { + initExecutor.shutdown(); + return initExecutor.awaitTermination(timeout, unit); + } catch (InterruptedException e) { + LOGGER.warn("Notebook Init-Job interrupted!", e); + // (Re-)Cancel if current thread also interrupted + initExecutor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + return false; + } + private void recoverRunningParagraphs() { Thread thread = new Thread(() -> getNotesInfo().forEach(noteInfo -> { @@ -683,14 +749,14 @@ public class Notebook { .collect(Collectors.toList()); } - public List<NoteInfo> getNotesInfo(Function<String, Boolean> func) { + public List<NoteInfo> getNotesInfo(Predicate<String> func) { String homescreenNoteId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); boolean hideHomeScreenNotebookFromList = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); synchronized (noteManager.getNotesInfo()) { List<NoteInfo> notesInfo = noteManager.getNotesInfo().entrySet().stream().filter(entry -> - func.apply(entry.getKey()) && + func.test(entry.getKey()) && ((!hideHomeScreenNotebookFromList) || ((hideHomeScreenNotebookFromList) && !entry.getKey().equals(homescreenNoteId)))) .map(entry -> new NoteInfo(entry.getKey(), entry.getValue())) @@ -752,6 +818,9 @@ public class Notebook { } public void close() { + if (initExecutor != null) { + ExecutorUtil.softShutdown("NotebookInit", initExecutor, 1, TimeUnit.MINUTES); + } this.notebookRepo.close(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index 44463a1661..5ba06f77ab 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -34,7 +34,6 @@ public class CronJob implements org.quartz.Job { private static final String RESULT_SUCCEEDED = "succeeded"; private static final String RESULT_FAILED = "failed"; - private static final String RESULT_SKIPPED = "skipped"; @Override public void execute(JobExecutionContext context) { @@ -49,14 +48,6 @@ public class CronJob implements org.quartz.Job { context.setResult(RESULT_FAILED); return null; } - if (note.haveRunningOrPendingParagraphs()) { - LOGGER.warn( - "execution of the cron job is skipped because there is a running or pending " - + "paragraph (note id: {})", - note.getId()); - context.setResult(RESULT_SKIPPED); - return null; - } String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser"); String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles"); if (null == cronExecutingUser) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java similarity index 88% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java index 6a6deb41eb..d7a92909d5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java @@ -35,12 +35,12 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; -public class CronJobListener implements JobListener { +public class MetricCronJobListener implements JobListener { - private static final Logger LOGGER = LoggerFactory.getLogger(CronJobListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MetricCronJobListener.class); // JobExecutionContext -> Timer.Sample - private Map<JobExecutionContext, Timer.Sample> cronJobTimerSamples = new HashMap<>(); + private final Map<JobExecutionContext, Timer.Sample> cronJobTimerSamples = new HashMap<>(); @Override public String getName() { @@ -57,9 +57,7 @@ public class CronJobListener implements JobListener { @Override public void jobExecutionVetoed(JobExecutionContext context) { - JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); - String noteId = jobDataMap.getString("noteId"); - LOGGER.info("vetoed cron job of note: {}", noteId); + // do nothing } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java index 091b49efcf..3e55df2f1a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java @@ -17,9 +17,6 @@ package org.apache.zeppelin.notebook.scheduler; -import java.util.Collections; -import java.util.Set; - public class NoSchedulerService implements SchedulerService { @Override public boolean refreshCron(String noteId) { @@ -27,7 +24,7 @@ public class NoSchedulerService implements SchedulerService { } @Override - public Set<?> getJobs() { - return Collections.emptySet(); + public int getJobsSize() { + return 0; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java index eb3fce165b..53cf4225da 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java @@ -18,19 +18,12 @@ package org.apache.zeppelin.notebook.scheduler; import java.io.IOException; -import java.util.Collections; import java.util.Map; -import java.util.Set; - import javax.inject.Inject; -import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.user.AuthenticationInfo; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; @@ -51,7 +44,6 @@ public class QuartzSchedulerService implements SchedulerService { private final ZeppelinConfiguration zeppelinConfiguration; private final Notebook notebook; private final Scheduler scheduler; - private final Thread loadingNotesThread; @Inject public QuartzSchedulerService(ZeppelinConfiguration zeppelinConfiguration, Notebook notebook) @@ -59,115 +51,93 @@ public class QuartzSchedulerService implements SchedulerService { this.zeppelinConfiguration = zeppelinConfiguration; this.notebook = notebook; this.scheduler = getScheduler(); - this.scheduler.getListenerManager().addJobListener(new CronJobListener()); + this.scheduler.getListenerManager().addJobListener(new MetricCronJobListener()); + this.scheduler.getListenerManager().addTriggerListener(new ZeppelinCronJobTriggerListerner()); this.scheduler.start(); - - // Do in a separated thread because there may be many notes, - // loop all notes in the main thread may block the restarting of Zeppelin server - // TODO(zjffdu) It may cause issue when user delete note before this thread is finished - this.loadingNotesThread = new Thread(() -> { - LOGGER.info("Starting init cronjobs"); - notebook.getNotesInfo().stream() - .forEach(entry -> { - try { - refreshCron(entry.getId()); - } catch (Exception e) { - LOGGER.warn("Fail to refresh cron for note: {}", entry.getId()); - } - }); - LOGGER.info("Complete init cronjobs"); - }); - loadingNotesThread.setName("Init CronJob Thread"); - loadingNotesThread.setDaemon(true); - loadingNotesThread.start(); + // Start Init + notebook.addInitConsumer(this::refreshCron); } + private Scheduler getScheduler() throws SchedulerException { return new StdSchedulerFactory().getScheduler(); } - /** - * This is only for testing, unit test should always call this method in setup() before testing. - */ - @VisibleForTesting - public void waitForFinishInit() { - try { - loadingNotesThread.join(); - } catch (InterruptedException e) { - LOGGER.warn("Unexpected exception", e); - } - } - @Override - public boolean refreshCron(String noteId) throws IOException { + public boolean refreshCron(String noteId) { removeCron(noteId); - return notebook.processNote(noteId, - note -> { - if (note == null) { - LOGGER.warn("Skip refresh cron of note: {} because there's no such note", noteId); - return false; - } - if (note.isTrash()) { - LOGGER.warn("Skip refresh cron of note: {} because it is in trash", noteId); - return false; - } - Map<String, Object> config = note.getConfig(); - if (config == null) { - LOGGER.warn("Skip refresh cron of note: {} because its config is empty.", noteId); - return false; - } - if (!note.isCronSupported(zeppelinConfiguration)) { - LOGGER.warn("Skip refresh cron of note {} because its cron is not enabled.", noteId); - return false; - } - String cronExpr = (String) note.getConfig().get("cron"); - if (cronExpr == null || cronExpr.trim().length() == 0) { - LOGGER.warn("Skip refresh cron of note {} because its cron expression is empty.", noteId); - return false; - } - JobDataMap jobDataMap = new JobDataMap(); - jobDataMap.put("notebook", notebook); - jobDataMap.put("noteId", noteId); - JobDetail newJob = - JobBuilder.newJob(CronJob.class) - .withIdentity(noteId, "note") - .setJobData(jobDataMap) - .build(); - Map<String, Object> info = note.getInfo(); - info.put("cron", null); - CronTrigger trigger = null; - try { - trigger = - TriggerBuilder.newTrigger() - .withIdentity("trigger_" + noteId, "note") - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)) - .forJob(noteId, "note") + try { + return notebook.processNote(noteId, + note -> { + if (note == null) { + LOGGER.warn("Skip refresh cron of note: {} because there's no such note", noteId); + return false; + } + if (note.isTrash()) { + LOGGER.warn("Skip refresh cron of note: {} because it is in trash", noteId); + return false; + } + Map<String, Object> config = note.getConfig(); + if (config == null) { + LOGGER.warn("Skip refresh cron of note: {} because its config is empty.", noteId); + return false; + } + if (!note.isCronSupported(zeppelinConfiguration)) { + LOGGER.warn("Skip refresh cron of note {} because its cron is not enabled.", noteId); + return false; + } + String cronExpr = (String) note.getConfig().get("cron"); + if (StringUtils.isBlank(cronExpr)) { + LOGGER.warn("Skip refresh cron of note: {} because its cron expression is empty.", noteId); + return false; + } + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put("notebook", notebook); + jobDataMap.put("noteId", noteId); + JobDetail newJob = + JobBuilder.newJob(CronJob.class) + .withIdentity(noteId, "note") + .setJobData(jobDataMap) .build(); - } catch (Exception e) { - LOGGER.error("Fail to create cron trigger for note: {}", noteId, e); - info.put("cron", e.getMessage()); - return false; - } + Map<String, Object> info = note.getInfo(); + info.put("cron", null); + CronTrigger trigger = null; + try { + trigger = + TriggerBuilder.newTrigger() + .withIdentity("trigger_" + noteId, "note") + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)) + .forJob(noteId, "note") + .build(); + } catch (Exception e) { + LOGGER.error("Fail to create cron trigger for note: {}", noteId, e); + info.put("cron", e.getMessage()); + return false; + } - try { - LOGGER.info("Trigger cron for note: {}, with cron expression: {}", noteId, cronExpr); - scheduler.scheduleJob(newJob, trigger); - return true; - } catch (SchedulerException e) { - LOGGER.error("Fail to schedule cron job for note: {}", noteId, e); - info.put("cron", "Scheduler Exception"); - return false; - } - }); + try { + LOGGER.info("Trigger cron for note: {}, with cron expression: {}", noteId, cronExpr); + scheduler.scheduleJob(newJob, trigger); + return true; + } catch (SchedulerException e) { + LOGGER.error("Fail to schedule cron job for note: {}", noteId, e); + info.put("cron", "Scheduler Exception"); + return false; + } + }); + } catch (IOException e) { + LOGGER.error("Fail to schedule cron job for note: {}", noteId, e); + return false; + } } @Override - public Set<?> getJobs() { + public int getJobsSize() { try { - return scheduler.getJobKeys(GroupMatcher.anyGroup()); + return scheduler.getJobKeys(GroupMatcher.anyGroup()).size(); } catch (SchedulerException e) { LOGGER.error("Error while getting jobKeys", e); - return Collections.emptySet(); + return 0; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java index 535ddf0455..ac4c07607f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java @@ -17,10 +17,16 @@ package org.apache.zeppelin.notebook.scheduler; -import java.io.IOException; -import java.util.Set; - public interface SchedulerService { - boolean refreshCron(String noteId) throws IOException; - Set<?> getJobs(); + + /** + * @param noteId + * @return return true if the cron refresh was successfull + */ + boolean refreshCron(String noteId); + + /** + * @return size of queued jobs + */ + int getJobsSize(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java new file mode 100644 index 0000000000..1663f7373a --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java @@ -0,0 +1,61 @@ +package org.apache.zeppelin.notebook.scheduler; + +import java.io.IOException; + +import org.apache.zeppelin.notebook.Notebook; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.Trigger; +import org.quartz.Trigger.CompletedExecutionInstruction; +import org.quartz.TriggerListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeppelinCronJobTriggerListerner implements TriggerListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinCronJobTriggerListerner.class); + + @Override + public String getName() { + return "ZeppelinCronJobTriggerListerner"; + } + + @Override + public void triggerFired(Trigger trigger, JobExecutionContext context) { + // Do nothing + } + + @Override + public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) { + JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); + String noteId = jobDataMap.getString("noteId"); + Notebook notebook = (Notebook) jobDataMap.get("notebook"); + try { + return notebook.processNote(noteId, note -> { + if (note.haveRunningOrPendingParagraphs()) { + LOGGER.warn( + "execution of the cron job is skipped because there is a running or pending paragraph (noteId: {})", + noteId); + return true; + } + return false; + }); + } catch (IOException e) { + LOGGER.warn("Failed to check CronJob of note: {} because fail to get it", noteId ,e); + return true; + } + } + + @Override + public void triggerMisfired(Trigger trigger) { + // Do nothing + } + + @Override + public void triggerComplete(Trigger trigger, JobExecutionContext context, + CompletedExecutionInstruction triggerInstructionCode) { + // Do nothing + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index fe420edf8f..ff4f60ef6e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -104,7 +104,7 @@ public class LuceneSearch extends SearchService { throw new IOException("Failed to create new IndexWriter", e); } if (conf.isIndexRebuild()) { - startRebuildIndex(); + notebook.addInitConsumer(this::addNoteIndex); } this.notebook.addNotebookEventListener(this); } @@ -442,22 +442,4 @@ public class LuceneSearch extends SearchService { } updateDoc(noteId, noteName, null); } - - public void startRebuildIndex() { - Thread thread = new Thread(() -> { - LOGGER.info("Starting rebuild index"); - notebook.getNotesInfo().forEach(noteInfo -> { - addNoteIndex(noteInfo.getId()); - }); - LOGGER.info("Finish rebuild index"); - }); - thread.setName("LuceneSearch-RebuildIndex-Thread"); - thread.start(); - try { - thread.join(); - } catch (InterruptedException e) { - LOGGER.warn("Lucene Rebuild Index Thread interrupted!", e); - Thread.currentThread().interrupt(); - } - } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 8715f0fa8f..5869f3b5dd 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -104,7 +104,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo notebook = new Notebook(conf, authorizationService, notebookRepo, noteManager, interpreterFactory, interpreterSettingManager, credentials, null); notebook.setParagraphJobListener(this); schedulerService = new QuartzSchedulerService(conf, notebook); - schedulerService.waitForFinishInit(); + notebook.initNotebook(); + notebook.waitForFinishInit(1, TimeUnit.MINUTES); } @Override @@ -991,9 +992,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo note.setConfig(config); return null; }); - final int jobsBeforeRefresh = schedulerService.getJobs().size(); + final int jobsBeforeRefresh = schedulerService.getJobsSize(); schedulerService.refreshCron(noteId); - final int jobsAfterRefresh = schedulerService.getJobs().size(); + final int jobsAfterRefresh = schedulerService.getJobsSize(); assertEquals(jobsBeforeRefresh, jobsAfterRefresh);