This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ddbd90868 [ZEPPELIN-5754] Init Notes in background (#4382)
8ddbd90868 is described below

commit 8ddbd908687b826722673139f37fd2dde2c95ce5
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Wed Jul 6 09:35:55 2022 +0200

    [ZEPPELIN-5754] Init Notes in background (#4382)
    
    * Remove generic wildcard type in SchedulerService
    
    * Use a TriggerListener to check for running or pending paragraphs
    
    * Init-Jobs in background
    
    * Be safe when initNotebooks is called multiple times
---
 .../org/apache/zeppelin/util/ExecutorUtil.java     |   4 +-
 .../org/apache/zeppelin/server/ZeppelinServer.java |   2 +
 .../apache/zeppelin/cluster/ClusterEventTest.java  |   5 +-
 .../zeppelin/service/NotebookServiceTest.java      |   4 +-
 .../org/apache/zeppelin/notebook/Notebook.java     |  75 ++++++++-
 .../zeppelin/notebook/scheduler/CronJob.java       |   9 --
 ...JobListener.java => MetricCronJobListener.java} |  10 +-
 .../notebook/scheduler/NoSchedulerService.java     |   7 +-
 .../notebook/scheduler/QuartzSchedulerService.java | 170 +++++++++------------
 .../notebook/scheduler/SchedulerService.java       |  16 +-
 .../scheduler/ZeppelinCronJobTriggerListerner.java |  61 ++++++++
 .../org/apache/zeppelin/search/LuceneSearch.java   |  20 +--
 .../org/apache/zeppelin/notebook/NotebookTest.java |   7 +-
 13 files changed, 236 insertions(+), 154 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java
index 1e8187781f..27ba854a81 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.util;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -38,7 +39,8 @@ public class ExecutorUtil {
       // Wait a while for existing tasks to terminate
       if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) {
         LOGGER.warn("{} was not shut down in the given time {} {} - 
interrupting now", name, stopTimeoutVal, stopTimeoutUnit);
-        executor.shutdownNow(); // Cancel currently executing tasks
+        List<Runnable> neverExecuted = executor.shutdownNow(); // Cancel 
currently executing tasks
+        LOGGER.warn("{} tasks were never executed from {}.", 
neverExecuted.size(), name);
         // Wait a while for tasks to respond to being cancelled
         if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) {
           LOGGER.error("executor {} did not terminate", name);
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 7cb97a45a6..ae6846d554 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -260,6 +260,8 @@ public class ZeppelinServer extends ResourceConfig {
       sharedServiceLocator, SearchService.class.getName());
     ServiceLocatorUtilities.getService(
       sharedServiceLocator, SchedulerService.class.getName());
+    // Initialization of the Notes in the notebook asynchronously
+    notebook.initNotebook();
     // Try to recover here, don't do it in constructor of Notebook, because it 
would cause deadlock.
     notebook.recoveryIfNecessary();
 
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
index cd16bf8cb7..22dfa434b7 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
@@ -32,7 +32,6 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
 import org.apache.zeppelin.interpreter.thrift.ServiceException;
 import org.apache.zeppelin.notebook.AuthorizationService;
-import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
@@ -63,6 +62,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -101,7 +101,8 @@ public class ClusterEventTest extends ZeppelinServerMock {
     authorizationService = TestUtils.getInstance(AuthorizationService.class);
 
     schedulerService = new QuartzSchedulerService(zconf, notebook);
-    schedulerService.waitForFinishInit();
+    notebook.initNotebook();
+    notebook.waitForFinishInit(1, TimeUnit.MINUTES);
     notebookServer = spy(NotebookServer.getInstance());
     notebookService = new NotebookService(notebook, authorizationService, 
zconf, schedulerService);
 
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index 9e5489edb6..d6e81f18cf 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -39,6 +39,7 @@ import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -129,7 +130,8 @@ public class NotebookServiceTest {
             null);
     searchService = new LuceneSearch(zeppelinConfiguration, notebook);
     QuartzSchedulerService schedulerService = new 
QuartzSchedulerService(zeppelinConfiguration, notebook);
-    schedulerService.waitForFinishInit();
+    notebook.initNotebook();
+    notebook.waitForFinishInit(1, TimeUnit.MINUTES);
     notebookService =
         new NotebookService(
             notebook, authorizationService, zeppelinConfiguration, 
schedulerService);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index e2d913dc79..52bad06890 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -28,7 +28,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Function;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
 
@@ -51,8 +56,10 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
 import 
org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
 import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
+import org.apache.zeppelin.util.ExecutorUtil;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,6 +82,8 @@ public class Notebook {
   private NotebookRepo notebookRepo;
   private List<NoteEventListener> noteEventListeners = new 
CopyOnWriteArrayList<>();
   private Credentials credentials;
+  private final List<Consumer<String>> initConsumers;
+  private ExecutorService initExecutor;
 
   /**
    * Main constructor \w manual Dependency Injection
@@ -101,6 +110,7 @@ public class Notebook {
     this.interpreterSettingManager.setNotebook(this);
     this.credentials = credentials;
     addNotebookEventListener(this.interpreterSettingManager);
+    initConsumers = new LinkedList<>();
   }
 
   public void recoveryIfNecessary() {
@@ -109,6 +119,62 @@ public class Notebook {
     }
   }
 
+  /**
+   * Subsystems can add a consumer, which is executed during initialization.
+   * Initialization is performed in parallel and with multiple threads.
+   *
+   * The consumer must be thread safe.
+   *
+   * @param initConsumer Consumer, which is passed the NoteId.
+   */
+  public void addInitConsumer(Consumer<String> initConsumer) {
+    this.initConsumers.add(initConsumer);
+  }
+
+  /**
+   * Asynchronous and parallel initialization of notes (during startup)
+   */
+  public void initNotebook() {
+    if (initExecutor == null || initExecutor.isShutdown() || 
initExecutor.isTerminated()) {
+      initExecutor = new ThreadPoolExecutor(0, 
Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES,
+                     new LinkedBlockingQueue<>(), new 
SchedulerThreadFactory("NotebookInit"));
+    }
+    for (NoteInfo noteInfo : getNotesInfo()) {
+      initExecutor.execute(() -> {
+        for (Consumer<String> initConsumer : initConsumers) {
+          initConsumer.accept(noteInfo.getId());
+        }
+      });
+    }
+  }
+
+  /**
+   * Blocks until all init jobs have completed execution,
+   * or the timeout occurs, or the current thread is
+   * interrupted, whichever happens first.
+   *
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @return {@code true} if this initJobs terminated or no initialization is 
active
+   *         {@code false} if the timeout elapsed before termination or the 
wait is interrupted
+   */
+  public boolean waitForFinishInit(long timeout, TimeUnit unit) {
+    if (initExecutor == null) {
+      return true;
+    }
+    try {
+      initExecutor.shutdown();
+      return initExecutor.awaitTermination(timeout, unit);
+    } catch (InterruptedException e) {
+      LOGGER.warn("Notebook Init-Job interrupted!", e);
+      // (Re-)Cancel if current thread also interrupted
+      initExecutor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+    return false;
+  }
+
   private void recoverRunningParagraphs() {
     Thread thread = new Thread(() ->
       getNotesInfo().forEach(noteInfo -> {
@@ -683,14 +749,14 @@ public class Notebook {
         .collect(Collectors.toList());
   }
 
-  public List<NoteInfo> getNotesInfo(Function<String, Boolean> func) {
+  public List<NoteInfo> getNotesInfo(Predicate<String> func) {
     String homescreenNoteId = 
conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN);
     boolean hideHomeScreenNotebookFromList =
         conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE);
 
     synchronized (noteManager.getNotesInfo()) {
       List<NoteInfo> notesInfo = 
noteManager.getNotesInfo().entrySet().stream().filter(entry ->
-              func.apply(entry.getKey()) &&
+              func.test(entry.getKey()) &&
               ((!hideHomeScreenNotebookFromList) ||
                   ((hideHomeScreenNotebookFromList) && 
!entry.getKey().equals(homescreenNoteId))))
           .map(entry -> new NoteInfo(entry.getKey(), entry.getValue()))
@@ -752,6 +818,9 @@ public class Notebook {
   }
 
   public void close() {
+    if (initExecutor != null) {
+      ExecutorUtil.softShutdown("NotebookInit", initExecutor, 1, 
TimeUnit.MINUTES);
+    }
     this.notebookRepo.close();
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
index 44463a1661..5ba06f77ab 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
@@ -34,7 +34,6 @@ public class CronJob implements org.quartz.Job {
 
   private static final String RESULT_SUCCEEDED = "succeeded";
   private static final String RESULT_FAILED = "failed";
-  private static final String RESULT_SKIPPED = "skipped";
 
   @Override
   public void execute(JobExecutionContext context) {
@@ -49,14 +48,6 @@ public class CronJob implements org.quartz.Job {
           context.setResult(RESULT_FAILED);
           return null;
         }
-        if (note.haveRunningOrPendingParagraphs()) {
-          LOGGER.warn(
-              "execution of the cron job is skipped because there is a running 
or pending "
-                  + "paragraph (note id: {})",
-              note.getId());
-          context.setResult(RESULT_SKIPPED);
-          return null;
-        }
         String cronExecutingUser = (String) 
note.getConfig().get("cronExecutingUser");
         String cronExecutingRoles = (String) 
note.getConfig().get("cronExecutingRoles");
         if (null == cronExecutingUser) {
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java
similarity index 88%
rename from 
zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java
rename to 
zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java
index 6a6deb41eb..d7a92909d5 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJobListener.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/MetricCronJobListener.java
@@ -35,12 +35,12 @@ import io.micrometer.core.instrument.Tag;
 import io.micrometer.core.instrument.Tags;
 import io.micrometer.core.instrument.Timer;
 
-public class CronJobListener implements JobListener {
+public class MetricCronJobListener implements JobListener {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(CronJobListener.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetricCronJobListener.class);
 
   // JobExecutionContext -> Timer.Sample
-  private Map<JobExecutionContext, Timer.Sample> cronJobTimerSamples = new 
HashMap<>();
+  private final Map<JobExecutionContext, Timer.Sample> cronJobTimerSamples = 
new HashMap<>();
 
   @Override
   public String getName() {
@@ -57,9 +57,7 @@ public class CronJobListener implements JobListener {
 
   @Override
   public void jobExecutionVetoed(JobExecutionContext context) {
-    JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-    String noteId = jobDataMap.getString("noteId");
-    LOGGER.info("vetoed cron job of note: {}", noteId);
+    // do nothing
   }
 
   @Override
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
index 091b49efcf..3e55df2f1a 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
@@ -17,9 +17,6 @@
 
 package org.apache.zeppelin.notebook.scheduler;
 
-import java.util.Collections;
-import java.util.Set;
-
 public class NoSchedulerService implements SchedulerService {
   @Override
   public boolean refreshCron(String noteId) {
@@ -27,7 +24,7 @@ public class NoSchedulerService implements SchedulerService {
   }
 
   @Override
-  public Set<?> getJobs() {
-    return Collections.emptySet();
+  public int getJobsSize() {
+    return 0;
   }
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
index eb3fce165b..53cf4225da 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
@@ -18,19 +18,12 @@
 package org.apache.zeppelin.notebook.scheduler;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
-
 import javax.inject.Inject;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.CronTrigger;
 import org.quartz.JobBuilder;
@@ -51,7 +44,6 @@ public class QuartzSchedulerService implements 
SchedulerService {
   private final ZeppelinConfiguration zeppelinConfiguration;
   private final Notebook notebook;
   private final Scheduler scheduler;
-  private final Thread loadingNotesThread;
 
   @Inject
   public QuartzSchedulerService(ZeppelinConfiguration zeppelinConfiguration, 
Notebook notebook)
@@ -59,115 +51,93 @@ public class QuartzSchedulerService implements 
SchedulerService {
     this.zeppelinConfiguration = zeppelinConfiguration;
     this.notebook = notebook;
     this.scheduler = getScheduler();
-    this.scheduler.getListenerManager().addJobListener(new CronJobListener());
+    this.scheduler.getListenerManager().addJobListener(new 
MetricCronJobListener());
+    this.scheduler.getListenerManager().addTriggerListener(new 
ZeppelinCronJobTriggerListerner());
     this.scheduler.start();
-
-    // Do in a separated thread because there may be many notes,
-    // loop all notes in the main thread may block the restarting of Zeppelin 
server
-    // TODO(zjffdu) It may cause issue when user delete note before this 
thread is finished
-    this.loadingNotesThread = new Thread(() -> {
-        LOGGER.info("Starting init cronjobs");
-        notebook.getNotesInfo().stream()
-                .forEach(entry -> {
-                  try {
-                    refreshCron(entry.getId());
-                  } catch (Exception e) {
-                    LOGGER.warn("Fail to refresh cron for note: {}", 
entry.getId());
-                  }
-                });
-        LOGGER.info("Complete init cronjobs");
-    });
-    loadingNotesThread.setName("Init CronJob Thread");
-    loadingNotesThread.setDaemon(true);
-    loadingNotesThread.start();
+    // Start Init
+    notebook.addInitConsumer(this::refreshCron);
   }
 
+
   private Scheduler getScheduler() throws SchedulerException {
     return new StdSchedulerFactory().getScheduler();
   }
 
-  /**
-   * This is only for testing, unit test should always call this method in 
setup() before testing.
-   */
-  @VisibleForTesting
-  public void waitForFinishInit() {
-    try {
-      loadingNotesThread.join();
-    } catch (InterruptedException e) {
-      LOGGER.warn("Unexpected exception", e);
-    }
-  }
-
   @Override
-  public boolean refreshCron(String noteId) throws IOException {
+  public boolean refreshCron(String noteId) {
     removeCron(noteId);
-    return notebook.processNote(noteId,
-      note -> {
-        if (note == null) {
-          LOGGER.warn("Skip refresh cron of note: {} because there's no such 
note", noteId);
-          return false;
-        }
-        if (note.isTrash()) {
-          LOGGER.warn("Skip refresh cron of note: {} because it is in trash", 
noteId);
-          return false;
-        }
-        Map<String, Object> config = note.getConfig();
-        if (config == null) {
-          LOGGER.warn("Skip refresh cron of note: {} because its config is 
empty.", noteId);
-          return false;
-        }
-        if (!note.isCronSupported(zeppelinConfiguration)) {
-          LOGGER.warn("Skip refresh cron of note {} because its cron is not 
enabled.", noteId);
-          return false;
-        }
-        String cronExpr = (String) note.getConfig().get("cron");
-        if (cronExpr == null || cronExpr.trim().length() == 0) {
-          LOGGER.warn("Skip refresh cron of note {} because its cron 
expression is empty.", noteId);
-          return false;
-        }
-        JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put("notebook", notebook);
-        jobDataMap.put("noteId", noteId);
-        JobDetail newJob =
-            JobBuilder.newJob(CronJob.class)
-                .withIdentity(noteId, "note")
-                .setJobData(jobDataMap)
-                .build();
-        Map<String, Object> info = note.getInfo();
-        info.put("cron", null);
-        CronTrigger trigger = null;
-        try {
-          trigger =
-              TriggerBuilder.newTrigger()
-                  .withIdentity("trigger_" + noteId, "note")
-                  .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr))
-                  .forJob(noteId, "note")
+    try {
+      return notebook.processNote(noteId,
+        note -> {
+          if (note == null) {
+            LOGGER.warn("Skip refresh cron of note: {} because there's no such 
note", noteId);
+            return false;
+          }
+          if (note.isTrash()) {
+            LOGGER.warn("Skip refresh cron of note: {} because it is in 
trash", noteId);
+            return false;
+          }
+          Map<String, Object> config = note.getConfig();
+          if (config == null) {
+            LOGGER.warn("Skip refresh cron of note: {} because its config is 
empty.", noteId);
+            return false;
+          }
+          if (!note.isCronSupported(zeppelinConfiguration)) {
+            LOGGER.warn("Skip refresh cron of note {} because its cron is not 
enabled.", noteId);
+            return false;
+          }
+          String cronExpr = (String) note.getConfig().get("cron");
+          if (StringUtils.isBlank(cronExpr)) {
+            LOGGER.warn("Skip refresh cron of note: {} because its cron 
expression is empty.", noteId);
+            return false;
+          }
+          JobDataMap jobDataMap = new JobDataMap();
+          jobDataMap.put("notebook", notebook);
+          jobDataMap.put("noteId", noteId);
+          JobDetail newJob =
+              JobBuilder.newJob(CronJob.class)
+                  .withIdentity(noteId, "note")
+                  .setJobData(jobDataMap)
                   .build();
-        } catch (Exception e) {
-          LOGGER.error("Fail to create cron trigger for note: {}", noteId, e);
-          info.put("cron", e.getMessage());
-          return false;
-        }
+          Map<String, Object> info = note.getInfo();
+          info.put("cron", null);
+          CronTrigger trigger = null;
+          try {
+            trigger =
+                TriggerBuilder.newTrigger()
+                    .withIdentity("trigger_" + noteId, "note")
+                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr))
+                    .forJob(noteId, "note")
+                    .build();
+          } catch (Exception e) {
+            LOGGER.error("Fail to create cron trigger for note: {}", noteId, 
e);
+            info.put("cron", e.getMessage());
+            return false;
+          }
 
-        try {
-          LOGGER.info("Trigger cron for note: {}, with cron expression: {}",  
noteId, cronExpr);
-          scheduler.scheduleJob(newJob, trigger);
-          return true;
-        } catch (SchedulerException e) {
-          LOGGER.error("Fail to schedule cron job for note: {}", noteId, e);
-          info.put("cron", "Scheduler Exception");
-          return false;
-        }
-      });
+          try {
+            LOGGER.info("Trigger cron for note: {}, with cron expression: {}", 
 noteId, cronExpr);
+            scheduler.scheduleJob(newJob, trigger);
+            return true;
+          } catch (SchedulerException e) {
+            LOGGER.error("Fail to schedule cron job for note: {}", noteId, e);
+            info.put("cron", "Scheduler Exception");
+            return false;
+          }
+        });
+    } catch (IOException e) {
+      LOGGER.error("Fail to schedule cron job for note: {}", noteId, e);
+      return false;
+    }
   }
 
   @Override
-  public Set<?> getJobs() {
+  public int getJobsSize() {
     try {
-      return scheduler.getJobKeys(GroupMatcher.anyGroup());
+      return scheduler.getJobKeys(GroupMatcher.anyGroup()).size();
     } catch (SchedulerException e) {
       LOGGER.error("Error while getting jobKeys", e);
-      return Collections.emptySet();
+      return 0;
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
index 535ddf0455..ac4c07607f 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
@@ -17,10 +17,16 @@
 
 package org.apache.zeppelin.notebook.scheduler;
 
-import java.io.IOException;
-import java.util.Set;
-
 public interface SchedulerService {
-  boolean refreshCron(String noteId) throws IOException;
-  Set<?> getJobs();
+
+  /**
+   * @param noteId
+   * @return return true if the cron refresh was successfull
+   */
+  boolean refreshCron(String noteId);
+
+  /**
+   * @return size of queued jobs
+   */
+  int getJobsSize();
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java
new file mode 100644
index 0000000000..1663f7373a
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/ZeppelinCronJobTriggerListerner.java
@@ -0,0 +1,61 @@
+package org.apache.zeppelin.notebook.scheduler;
+
+import java.io.IOException;
+
+import org.apache.zeppelin.notebook.Notebook;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.Trigger;
+import org.quartz.Trigger.CompletedExecutionInstruction;
+import org.quartz.TriggerListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZeppelinCronJobTriggerListerner implements TriggerListener {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZeppelinCronJobTriggerListerner.class);
+
+  @Override
+  public String getName() {
+    return "ZeppelinCronJobTriggerListerner";
+  }
+
+  @Override
+  public void triggerFired(Trigger trigger, JobExecutionContext context) {
+    // Do nothing
+  }
+
+  @Override
+  public boolean vetoJobExecution(Trigger trigger, JobExecutionContext 
context) {
+    JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+    String noteId = jobDataMap.getString("noteId");
+    Notebook notebook = (Notebook) jobDataMap.get("notebook");
+    try {
+        return notebook.processNote(noteId, note -> {
+            if (note.haveRunningOrPendingParagraphs()) {
+                LOGGER.warn(
+                    "execution of the cron job is skipped because there is a 
running or pending paragraph (noteId: {})",
+                    noteId);
+                return true;
+            }
+            return false;
+        });
+    } catch (IOException e) {
+        LOGGER.warn("Failed to check CronJob of note: {} because fail to get 
it", noteId ,e);
+        return true;
+    }
+  }
+
+  @Override
+  public void triggerMisfired(Trigger trigger) {
+    // Do nothing
+  }
+
+  @Override
+  public void triggerComplete(Trigger trigger, JobExecutionContext context,
+      CompletedExecutionInstruction triggerInstructionCode) {
+    // Do nothing
+  }
+
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
index fe420edf8f..ff4f60ef6e 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
@@ -104,7 +104,7 @@ public class LuceneSearch extends SearchService {
       throw new IOException("Failed to create new IndexWriter", e);
     }
     if (conf.isIndexRebuild()) {
-      startRebuildIndex();
+      notebook.addInitConsumer(this::addNoteIndex);
     }
     this.notebook.addNotebookEventListener(this);
   }
@@ -442,22 +442,4 @@ public class LuceneSearch extends SearchService {
     }
     updateDoc(noteId, noteName, null);
   }
-
-  public void startRebuildIndex() {
-    Thread thread = new Thread(() -> {
-      LOGGER.info("Starting rebuild index");
-      notebook.getNotesInfo().forEach(noteInfo -> {
-        addNoteIndex(noteInfo.getId());
-      });
-      LOGGER.info("Finish rebuild index");
-    });
-    thread.setName("LuceneSearch-RebuildIndex-Thread");
-    thread.start();
-    try {
-      thread.join();
-    } catch (InterruptedException e) {
-      LOGGER.warn("Lucene Rebuild Index Thread interrupted!", e);
-      Thread.currentThread().interrupt();
-    }
-  }
 }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 8715f0fa8f..5869f3b5dd 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -104,7 +104,8 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     notebook = new Notebook(conf, authorizationService, notebookRepo, 
noteManager, interpreterFactory, interpreterSettingManager, credentials, null);
     notebook.setParagraphJobListener(this);
     schedulerService = new QuartzSchedulerService(conf, notebook);
-    schedulerService.waitForFinishInit();
+    notebook.initNotebook();
+    notebook.waitForFinishInit(1, TimeUnit.MINUTES);
   }
 
   @Override
@@ -991,9 +992,9 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
         note.setConfig(config);
         return null;
       });
-    final int jobsBeforeRefresh = schedulerService.getJobs().size();
+    final int jobsBeforeRefresh = schedulerService.getJobsSize();
     schedulerService.refreshCron(noteId);
-    final int jobsAfterRefresh = schedulerService.getJobs().size();
+    final int jobsAfterRefresh = schedulerService.getJobsSize();
 
     assertEquals(jobsBeforeRefresh, jobsAfterRefresh);
 

Reply via email to