This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new debfa7fcc2 [ZEPPELIN-5900] Fix RemoteSchedulerTest (#4582) debfa7fcc2 is described below commit debfa7fcc23d6f5ea2f0664308cdbe7eba78b419 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Tue May 2 08:37:44 2023 +0200 [ZEPPELIN-5900] Fix RemoteSchedulerTest (#4582) --- .../zeppelin/scheduler/AbstractScheduler.java | 24 +++--- .../java/org/apache/zeppelin/scheduler/Job.java | 8 ++ .../interpreter/remote/RemoteInterpreter.java | 14 ++-- .../apache/zeppelin/scheduler/RemoteScheduler.java | 85 ++++++++++++---------- .../zeppelin/scheduler/RemoteSchedulerTest.java | 1 + 5 files changed, 72 insertions(+), 60 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index fdd456be18..0f95088df3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -36,7 +36,7 @@ public abstract class AbstractScheduler implements Scheduler { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduler.class); - protected String name; + protected final String name; protected volatile boolean terminate = false; protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>(); protected Map<String, Job> jobs = new ConcurrentHashMap<>(); @@ -89,11 +89,14 @@ public abstract class AbstractScheduler implements Scheduler { runningJob = queue.take(); } catch (InterruptedException e) { LOGGER.warn("{} is interrupted", getClass().getSimpleName()); + // Restore interrupted state... + Thread.currentThread().interrupt(); break; } runJobInScheduler(runningJob); } + stop(); } public abstract void runJobInScheduler(Job job); @@ -124,7 +127,7 @@ public abstract class AbstractScheduler implements Scheduler { return; } - LOGGER.info("Job {} started by scheduler {}",runningJob.getId(), name); + LOGGER.info("Job {} started by scheduler {}", runningJob.getId(), name); // Don't set RUNNING status when it is RemoteScheduler, update it via JobStatusPoller if (!getClass().getSimpleName().equals("RemoteScheduler")) { runningJob.setStatus(Job.Status.RUNNING); @@ -133,21 +136,14 @@ public abstract class AbstractScheduler implements Scheduler { Object jobResult = runningJob.getReturn(); synchronized (runningJob) { if (runningJob.isAborted()) { + LOGGER.debug("Job Aborted, {}, {}", runningJob.getId(), runningJob.getErrorMessage()); runningJob.setStatus(Job.Status.ABORT); - LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " + - runningJob.getErrorMessage()); - } else if (runningJob.getException() != null) { - LOGGER.debug("Job Error, " + runningJob.getId() + ", " + - runningJob.getReturn()); - runningJob.setStatus(Job.Status.ERROR); - } else if (jobResult != null && jobResult instanceof InterpreterResult - && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) { - LOGGER.debug("Job Error, " + runningJob.getId() + ", " + - runningJob.getReturn()); + } else if (runningJob.getException() != null || (jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR)) { + LOGGER.debug("Job Error, {}, {}", runningJob.getId(), runningJob.getReturn()); runningJob.setStatus(Job.Status.ERROR); } else { - LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " + - runningJob.getReturn()); + LOGGER.debug("Job Finished, {}, Result: {}", runningJob.getId(), runningJob.getReturn()); runningJob.setStatus(Job.Status.FINISHED); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 7d06d490c9..55d76d8e94 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -70,6 +70,14 @@ public abstract class Job<T> { public boolean isCompleted() { return this == FINISHED || this == ERROR || this == ABORT; } + + public boolean isAbort() { + return this == ABORT; + } + + public boolean isFailed() { + return this == ERROR || this == ABORT; + } } private String jobName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 02f1b624ba..ba3be51048 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -30,7 +30,6 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; @@ -342,17 +341,14 @@ public class RemoteInterpreter extends Interpreter { // running under the scheduler of this session will be aborted. String executionMode = getProperty(".execution.mode", "paragraph"); if (executionMode.equals("paragraph")) { - Scheduler s = new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId, - SchedulerFactory.singleton().getExecutor(), - this); + String name = RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + + "-" + sessionId; + Scheduler s = new RemoteScheduler(name, this); return SchedulerFactory.singleton().createOrGetScheduler(s); } else if (executionMode.equals("note")) { String noteId = getProperty(".noteId"); - Scheduler s = new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() + "-" + noteId, - SchedulerFactory.singleton().getExecutor(), - this); + String name = RemoteInterpreter.class.getSimpleName() + "-" + noteId; + Scheduler s = new RemoteScheduler(name, this); return SchedulerFactory.singleton().createOrGetScheduler(s); } else { throw new RuntimeException("Invalid execution mode: " + executionMode); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index d8797fff89..8c068f9f60 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -24,7 +24,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter. @@ -34,14 +36,14 @@ import java.util.concurrent.TimeUnit; public class RemoteScheduler extends AbstractScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteScheduler.class); - private RemoteInterpreter remoteInterpreter; - private ExecutorService executor; + private final RemoteInterpreter remoteInterpreter; + private final ExecutorService executor; public RemoteScheduler(String name, - ExecutorService executor, RemoteInterpreter remoteInterpreter) { super(name); - this.executor = executor; + this.executor = + Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" + name + "-")); this.remoteInterpreter = remoteInterpreter; } @@ -53,22 +55,26 @@ public class RemoteScheduler extends AbstractScheduler { remoteInterpreter.getProperty(".execution.mode", "paragraph"); if (executionMode.equals("paragraph")) { // wait until it is submitted to the remote - while (!jobRunner.isJobSubmittedInRemote()) { + while (!jobRunner.isJobSubmittedInRemote() && !Thread.currentThread().isInterrupted()) { try { Thread.sleep(100); } catch (InterruptedException e) { LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + "queue.wait", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); } } } else if (executionMode.equals("note")){ // wait until it is finished - while (!jobRunner.isJobExecuted()) { + while (!jobRunner.isJobExecuted() && !Thread.currentThread().isInterrupted()) { try { Thread.sleep(100); } catch (InterruptedException e) { LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " + "queue.wait", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); } } } else { @@ -82,10 +88,11 @@ public class RemoteScheduler extends AbstractScheduler { * RUNNING status. This thread will exist after job is in RUNNING/FINISHED state. */ private class JobStatusPoller extends Thread { - private long checkIntervalMsec; - private volatile boolean terminate; - private JobListener listener; - private Job job; + private final Logger logger = LoggerFactory.getLogger(JobRunner.class); + private final long checkIntervalMsec; + private final AtomicBoolean terminate; + private final JobListener listener; + private final Job job; private volatile Status lastStatus; public JobStatusPoller(Job job, @@ -95,41 +102,38 @@ public class RemoteScheduler extends AbstractScheduler { this.checkIntervalMsec = checkIntervalMsec; this.job = job; this.listener = listener; - this.terminate = false; + this.terminate = new AtomicBoolean(false); } @Override public void run() { - while (terminate == false) { - synchronized (this) { - try { - this.wait(checkIntervalMsec); - } catch (InterruptedException e) { - LOGGER.error("Exception in RemoteScheduler while run this.wait", e); - } - } - - if (terminate) { - // terminated by shutdown - break; - } - + while (!terminate.get() && !Thread.currentThread().isInterrupted()) { Status newStatus = getStatus(); if (newStatus == Status.RUNNING || newStatus == Status.FINISHED || newStatus == Status.ERROR || newStatus == Status.ABORT) { // Exit this thread when job is in RUNNING/FINISHED/ERROR/ABORT state. - break; + terminate.set(true); + } else { + synchronized (terminate) { + try { + terminate.wait(checkIntervalMsec); + } catch (InterruptedException e) { + logger.error("Exception in RemoteScheduler while run this.wait", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } + } } } - terminate = true; + terminate.set(true); } public void shutdown() { - terminate = true; - synchronized (this) { - this.notify(); + synchronized (terminate) { + terminate.set(true); + terminate.notifyAll(); } } @@ -155,8 +159,8 @@ public class RemoteScheduler extends AbstractScheduler { private class JobRunner implements Runnable, JobListener { private final Logger logger = LoggerFactory.getLogger(JobRunner.class); - private RemoteScheduler scheduler; - private Job job; + private final RemoteScheduler scheduler; + private final Job job; private volatile boolean jobExecuted; private volatile boolean jobSubmittedRemotely; @@ -187,6 +191,8 @@ public class RemoteScheduler extends AbstractScheduler { jobStatusPoller.join(); } catch (InterruptedException e) { logger.error("JobStatusPoller interrupted", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); } } @@ -197,7 +203,11 @@ public class RemoteScheduler extends AbstractScheduler { // Call by JobStatusPoller thread, update status when JobStatusPoller get new status. @Override public void onStatusChange(Job job, Status before, Status after) { - if (jobExecuted == false) { + if (!job.equals(this.job)) { + logger.error("StatusChange for an unkown job. {} != {}", this.job.getId(), job.getId()); + return; + } + if (!jobExecuted) { if (after == Status.FINISHED || after == Status.ABORT || after == Status.ERROR) { // it can be status of last run. @@ -205,7 +215,7 @@ public class RemoteScheduler extends AbstractScheduler { return; } else if (after == Status.RUNNING) { jobSubmittedRemotely = true; - job.setStatus(Status.RUNNING); + this.job.setStatus(Status.RUNNING); } } else { jobSubmittedRemotely = true; @@ -214,9 +224,9 @@ public class RemoteScheduler extends AbstractScheduler { // only set status when the status fetched from JobStatusPoller is RUNNING, // the status of job itself is still in PENDING. // Because the status from JobStatusPoller may happen after the job is finished. - synchronized (job) { - if (after == Status.RUNNING && job.getStatus() == Status.PENDING) { - job.setStatus(Status.RUNNING); + synchronized (this.job) { + if (after == Status.RUNNING && this.job.getStatus() == Status.PENDING) { + this.job.setStatus(Status.RUNNING); } } } @@ -225,6 +235,7 @@ public class RemoteScheduler extends AbstractScheduler { @Override public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) { super.stop(); + ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 5240ab906c..3aee8a9c72 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -260,6 +260,7 @@ public class RemoteSchedulerTest extends AbstractInterpreterTest assertNotNull(job1.getDateFinished()); assertTrue(job1.isTerminated()); + assertEquals("1000", job1.getReturn()); assertNull(job2.getDateFinished()); assertTrue(job2.isTerminated()); assertEquals("result2", job2.getReturn());