This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new f6facc8 [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order f6facc8 is described below commit f6facc83db167ce20154961cf77406e046cc920b Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed May 20 00:02:43 2020 +0800 [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order ### What is this PR for? Before this PR, if user run the note in non-blocking way via rest api, the execution order of paragraphs is not determined, paragraph 2 will start to run after paragraph 1 enter running state. But we should only start paragraph 2 when paragraph 1 is finished. This PR fix this issue by minor change the `RemoteScheduler` and introduce `.execution.mode` to tell `RemoteScheduler` that whether the job is running as individual paragraph or as part of note execution. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4832 ### How should this be tested? * Unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3775 from zjffdu/ZEPPELIN-4832 and squashes the following commits: 36338a83f [Jeff Zhang] [ZEPPELIN-4832]. run note in non-blocking way could not guarantee the paragraph execution order (cherry picked from commit 6d5783c694cef8a09a5e9f708f93d5a5460fbc16) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../apache/zeppelin/rest/NotebookRestApiTest.java | 44 +++++++++++++++++++++- .../interpreter/remote/RemoteInterpreter.java | 23 ++++++++--- .../java/org/apache/zeppelin/notebook/Note.java | 18 ++++++++- .../org/apache/zeppelin/notebook/Paragraph.java | 8 ++++ .../apache/zeppelin/scheduler/RemoteScheduler.java | 35 +++++++++++++---- 5 files changed, 113 insertions(+), 15 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java index 328a1b2..b4fbb3d 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java @@ -193,7 +193,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } @Test - public void testRunAllParagraph_AllSuccess() throws IOException { + public void testRunNoteBlocking() throws IOException { Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -232,6 +232,48 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } @Test + public void testRunNoteNonBlocking() throws Exception { + Note note1 = null; + try { + note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + // 2 paragraphs + // P1: + // %python + // import time + // time.sleep(5) + // name='hello' + // z.put('name', name) + // P2: + // %%sh(interpolate=true) + // echo '{name}' + // + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)"); + p2.setText("%sh(interpolate=true) echo '{name}'"); + + PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", ""); + assertThat(post, isAllowed()); + Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), + new TypeToken<Map<String, Object>>() {}.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + + p1.waitUntilFinished(); + p2.waitUntilFinished(); + + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.FINISHED, p2.getStatus()); + assertEquals("hello\n", p2.getReturn().message().get(0).getData()); + } finally { + // cleanup + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + } + } + + @Test public void testRunAllParagraph_FirstFailed() throws IOException { Note note1 = null; try { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 692224b..d1604c1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -391,11 +391,24 @@ public class RemoteInterpreter extends Interpreter { public Scheduler getScheduler() { // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs // running under the scheduler of this session will be aborted. - Scheduler s = new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId, - SchedulerFactory.singleton().getExecutor(), - this); - return SchedulerFactory.singleton().createOrGetScheduler(s); + String executionMode = getProperty(".execution.mode", "paragraph"); + if (executionMode.equals("paragraph")) { + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId, + SchedulerFactory.singleton().getExecutor(), + this); + return SchedulerFactory.singleton().createOrGetScheduler(s); + } else if (executionMode.equals("note")) { + String noteId = getProperty(".noteId"); + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + "-" + noteId, + SchedulerFactory.singleton().getExecutor(), + this); + return SchedulerFactory.singleton().createOrGetScheduler(s); + } else { + throw new RuntimeException("Invalid execution mode: " + executionMode); + } + } private RemoteInterpreterContext convert(InterpreterContext ic) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index ce70491..55cb4f4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -747,7 +747,8 @@ public class Note implements JsonSerializable { } } - public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) throws Exception { + public void runAll(AuthenticationInfo authenticationInfo, + boolean blocking) throws Exception { setRunning(true); try { for (Paragraph p : getParagraphs()) { @@ -755,6 +756,17 @@ public class Note implements JsonSerializable { continue; } p.setAuthenticationInfo(authenticationInfo); + try { + Interpreter interpreter = p.getBindedInterpreter(); + if (interpreter != null) { + // set interpreter property to execution.mode to be note + // so that it could use the correct scheduler. see ZEPPELIN-4832 + interpreter.setProperty(".execution.mode", "note"); + interpreter.setProperty(".noteId", id); + } + } catch (InterpreterNotFoundException e) { + // ignore, because the following run method will fail if interpreter not found. + } if (!run(p.getId(), blocking)) { logger.warn("Skip running the remain notes because paragraph {} fails", p.getId()); throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " + @@ -787,7 +799,9 @@ public class Note implements JsonSerializable { * @param ctxUser * @return */ - public boolean run(String paragraphId, boolean blocking, String ctxUser) { + public boolean run(String paragraphId, + boolean blocking, + String ctxUser) { Paragraph p = getParagraph(paragraphId); if (isPersonalizedMode() && ctxUser != null) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 0ff95c3..0086a47 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -707,6 +707,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen } } + @VisibleForTesting + public void waitUntilFinished() throws Exception { + while(!isTerminated()) { + LOGGER.debug("Wait for paragraph to be finished"); + Thread.sleep(1000); + } + } + private GUI getNoteGui() { GUI gui = new GUI(); gui.setParams(this.note.getNoteParams()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 5f19df1..3797c8b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -47,14 +47,31 @@ public class RemoteScheduler extends AbstractScheduler { public void runJobInScheduler(Job job) { JobRunner jobRunner = new JobRunner(this, job); executor.execute(jobRunner); - // wait until it is submitted to the remote - while (!jobRunner.isJobSubmittedInRemote()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + - "queue.wait", e); + String executionMode = + remoteInterpreter.getProperty(".execution.mode", "paragraph"); + if (executionMode.equals("paragraph")) { + // wait until it is submitted to the remote + while (!jobRunner.isJobSubmittedInRemote()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + + "queue.wait", e); + } } + } else if (executionMode.equals("note")){ + // wait until it is finished + while (!jobRunner.isJobExecuted()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " + + "queue.wait", e); + } + } + } else { + throw new RuntimeException("Invalid job execution.mode: " + executionMode + + ", only 'note' and 'paragraph' are valid"); } } @@ -152,6 +169,10 @@ public class RemoteScheduler extends AbstractScheduler { return jobSubmittedRemotely; } + public boolean isJobExecuted() { + return jobExecuted; + } + @Override public void run() { JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);