This is an automated email from the ASF dual-hosted git repository. moon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 7c7d51a [ZEPPELIN-4619] Run a note from the commandline 7c7d51a is described below commit 7c7d51a2d5876456fe1a7eba2f985454213e0e32 Author: Lee moon soo <m...@apache.org> AuthorDate: Thu Feb 27 17:34:45 2020 -0800 [ZEPPELIN-4619] Run a note from the commandline ### What is this PR for? This PR adds a command-line option to run a single note. The code is pickled from https://github.com/apache/zeppelin/pull/3356. Usage is ``` bin/zeppelin.sh --run <noteId> ``` The command will launch a Zeppelin server and run a given note, and terminate on after all paragraph's successful run (with exit 0) or terminate on any paragraph error (with exit 1). ### What type of PR is it? Feature ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4619 ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <m...@apache.org> This patch had conflicts when merged, resolved by Committer: Lee moon soo <m...@apache.org> Closes #3654 from Leemoonsoo/ZEPPELIN-4619 and squashes the following commits: 6a8316388 [Lee moon soo] remove useless test a7531a49f [Lee moon soo] address comment 7743b43f6 [Lee moon soo] add log 99974f89d [Lee moon soo] overload loading runAllParagraphs() 101d1557b [Lee moon soo] add --run option --- bin/zeppelin.sh | 34 +++++---- .../zeppelin/conf/ZeppelinConfiguration.java | 22 ++++++ .../org/apache/zeppelin/server/ZeppelinServer.java | 87 +++++++++++++++++----- .../apache/zeppelin/service/NotebookService.java | 33 ++++++-- .../zeppelin/service/NotebookServiceTest.java | 18 +++-- 5 files changed, 149 insertions(+), 45 deletions(-) diff --git a/bin/zeppelin.sh b/bin/zeppelin.sh index bf5aaba..9f32fb4 100755 --- a/bin/zeppelin.sh +++ b/bin/zeppelin.sh @@ -36,20 +36,26 @@ if [ -z "$uidentry" ] ; then fi fi -USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>]" - -if [[ "$1" == "--config" ]]; then - shift - conf_dir="$1" - if [[ ! -d "${conf_dir}" ]]; then - echo "ERROR : ${conf_dir} is not a directory" - echo ${USAGE} - exit 1 - else - export ZEPPELIN_CONF_DIR="${conf_dir}" - fi - shift -fi +USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>] [--run <noteId>]" + +POSITIONAL=() +while [[ $# -gt 0 ]] +do + key="$1" + case $key in + --config) + export ZEPPELIN_CONF_DIR="$2" + shift # past argument + shift # past value + ;; + --run) + export ZEPPELIN_NOTEBOOK_RUN_ID="$2" + shift # past argument + shift # past value + ;; + esac +done +set -- "${POSITIONAL[@]}" # restore positional parameters bin=$(dirname "${BASH_SOURCE-$0}") bin=$(cd "${bin}">/dev/null; pwd) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 78fdb86..43f9ec7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -397,6 +397,22 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); } + public String getNotebookRunId() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_ID); + } + + public String getNotebookRunRev() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_REV); + } + + public String getNotebookRunServiceContext() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT); + } + + public boolean getNotebookRunAutoShutdown() { + return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN); + } + public String getPluginsDir() { return getRelativeDir(getString(ConfVars.ZEPPELIN_PLUGINS_DIR)); } @@ -831,6 +847,12 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + + ZEPPELIN_NOTEBOOK_RUN_ID("zeppelin.notebook.run.id", null), // run particular note id on zeppelin start + ZEPPELIN_NOTEBOOK_RUN_REV("zeppelin.notebook.run.rev", null), // revision id for ZEPPELIN_NOTEBOOK_RUN_ID. + ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT("zeppelin.notebook.run.servicecontext", null), // base64 encoded serialized service context to be used ZEPPELIN_NOTEBOOK_RUN_ID. + ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN("zeppelin.notebook.run.autoshutdown", true), // after specified note (ZEPPELIN_NOTEBOOK_RUN_ID) run, shutdown zeppelin server + ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"), ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class", "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"), diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 258c930..0e5be41 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -16,9 +16,12 @@ */ package org.apache.zeppelin.server; +import com.google.gson.Gson; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.Base64; +import java.util.HashSet; import java.util.List; import java.util.EnumSet; import java.util.Objects; @@ -30,6 +33,7 @@ import javax.servlet.DispatcherType; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.apache.commons.lang.StringUtils; +import org.apache.directory.api.util.Strings; import org.apache.shiro.web.env.EnvironmentLoaderListener; import org.apache.shiro.web.servlet.ShiroFilter; import org.apache.zeppelin.cluster.ClusterManagerServer; @@ -49,6 +53,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.NoteEventListener; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.AuthorizationService; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.scheduler.NoSchedulerService; @@ -61,6 +66,7 @@ import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.service.*; import org.apache.zeppelin.service.AuthenticationService; import org.apache.zeppelin.socket.NotebookServer; +import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; import org.apache.zeppelin.util.ReflectionUtils; import org.eclipse.jetty.http.HttpVersion; @@ -117,7 +123,7 @@ public class ZeppelinServer extends ResourceConfig { packages("org.apache.zeppelin.rest"); } - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, IOException { ZeppelinServer.conf = ZeppelinConfiguration.create(); conf.setProperty("args", args); @@ -254,23 +260,9 @@ public class ZeppelinServer extends ResourceConfig { } LOG.info("Done, zeppelin server started"); - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - LOG.info("Shutting down Zeppelin Server ... "); - try { - jettyWebServer.stop(); - if (!conf.isRecoveryEnabled()) { - sharedServiceLocator.getService(InterpreterSettingManager.class).close(); - } - sharedServiceLocator.getService(Notebook.class).close(); - Thread.sleep(3000); - } catch (Exception e) { - LOG.error("Error while stopping servlet container", e); - } - LOG.info("Bye"); - })); + runNoteOnStart(conf); + + Runtime.getRuntime().addShutdownHook(shutdown(conf)); // when zeppelin is started inside of ide (especially for eclipse) // for graceful shutdown, input any key in console window @@ -289,6 +281,24 @@ public class ZeppelinServer extends ResourceConfig { } } + private static Thread shutdown(ZeppelinConfiguration conf) { + return new Thread( + () -> { + LOG.info("Shutting down Zeppelin Server ... "); + try { + jettyWebServer.stop(); + if (!conf.isRecoveryEnabled()) { + sharedServiceLocator.getService(InterpreterSettingManager.class).close(); + } + sharedServiceLocator.getService(Notebook.class).close(); + Thread.sleep(3000); + } catch (Exception e) { + LOG.error("Error while stopping servlet container", e); + } + LOG.info("Bye"); + }); + } + private static Server setupJettyServer(ZeppelinConfiguration conf) { ThreadPool threadPool = new QueuedThreadPool(conf.getInt(ConfVars.ZEPPELIN_SERVER_JETTY_THREAD_POOL_MAX), @@ -332,6 +342,47 @@ public class ZeppelinServer extends ResourceConfig { server.addConnector(connector); } + private static void runNoteOnStart(ZeppelinConfiguration conf) throws IOException, InterruptedException { + String noteIdToRun = conf.getNotebookRunId(); + if (!Strings.isEmpty(noteIdToRun)) { + LOG.info("Running note {} on start", noteIdToRun); + NotebookService notebookService = (NotebookService) ServiceLocatorUtilities.getService( + sharedServiceLocator, NotebookService.class.getName()); + + ServiceContext serviceContext; + String base64EncodedJsonSerializedServiceContext = conf.getNotebookRunServiceContext(); + if (Strings.isEmpty(base64EncodedJsonSerializedServiceContext)) { + LOG.info("No service context provided. use ANONYMOUS"); + serviceContext = new ServiceContext(AuthenticationInfo.ANONYMOUS, new HashSet<String>() {}); + } else { + serviceContext = new Gson().fromJson( + new String(Base64.getDecoder().decode(base64EncodedJsonSerializedServiceContext)), + ServiceContext.class); + } + + boolean success = notebookService.runAllParagraphs(noteIdToRun, null, serviceContext, new ServiceCallback<Paragraph>() { + @Override + public void onStart(String message, ServiceContext context) throws IOException { + } + + @Override + public void onSuccess(Paragraph result, ServiceContext context) throws IOException { + } + + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + } + }); + + if (conf.getNotebookRunAutoShutdown()) { + Thread t = shutdown(conf); + t.start(); + t.join(); + System.exit(success ? 0 : 1); + } + } + } + private static void configureRequestHeaderSize( ZeppelinConfiguration conf, ServerConnector connector) { HttpConnectionFactory cf = diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 7ed3331..99ed6bb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -22,6 +22,8 @@ package org.apache.zeppelin.service; import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN; import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -30,6 +32,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.inject.Inject; + +import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; @@ -82,6 +86,7 @@ public class NotebookService { private Notebook notebook; private AuthorizationService authorizationService; private SchedulerService schedulerService; + private Gson gson = new Gson(); @Inject public NotebookService( @@ -344,19 +349,35 @@ public class NotebookService { } } - public void runAllParagraphs(String noteId, + /** + * Run list of paragraphs. This method runs provided paragraphs one by one, synchronously. + * When a paragraph fails, subsequent paragraphs are not going to run and this method returns false. + * When list of paragraphs provided from argument is null, list of paragraphs stored in the Note will be used. + * + * @param noteId + * @param paragraphs list of paragraphs to run. List of paragraph stored in the Note will be used when null. + * @param context + * @param callback + * @return true when all paragraphs successfully run. false when any paragraph fails. + * @throws IOException + */ + public boolean runAllParagraphs(String noteId, List<Map<String, Object>> paragraphs, ServiceContext context, ServiceCallback<Paragraph> callback) throws IOException { if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_ALL_PARAGRAPHS, context, callback)) { - return; + return false; } Note note = notebook.getNote(noteId); if (note == null) { callback.onFailure(new NoteNotFoundException(noteId), context); - return; + return false; + } + + if (paragraphs == null) { + paragraphs = gson.fromJson(gson.toJson(note.getParagraphs()), new TypeToken<List>(){}.getType()); } note.setRunning(true); @@ -366,7 +387,7 @@ public class NotebookService { if (paragraphId == null) { continue; } - String text = (String) raw.get("paragraph"); + String text = (String) raw.get("text"); String title = (String) raw.get("title"); Map<String, Object> params = (Map<String, Object>) raw.get("params"); Map<String, Object> config = (Map<String, Object>) raw.get("config"); @@ -374,12 +395,14 @@ public class NotebookService { if (!runParagraph(noteId, paragraphId, title, text, params, config, false, true, context, callback)) { // stop execution when one paragraph fails. - break; + return false; } } } finally { note.setRunning(false); } + + return true; } public void cancelParagraph(String noteId, diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index f77edc2..e56b58a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -361,20 +361,22 @@ public class NotebookServiceTest { // run all paragraphs reset(callback); + String textBefore = p.getText(); notebookService.runAllParagraphs( note1.getId(), gson.fromJson(gson.toJson(note1.getParagraphs()), new TypeToken<List>(){}.getType()), context, callback); verify(callback, times(2)).onSuccess(any(), any()); + assertEquals(textBefore, p.getText()); + + // run all paragraphs, with null paragraph list provided + reset(callback); + notebookService.runAllParagraphs( + note1.getId(), + null, + context, callback); + verify(callback, times(2)).onSuccess(any(), any()); - // run paragraph synchronously via invalid code - //TODO(zjffdu) must sleep for a while, otherwise will get wrong status. This should be due to - //bug of job component. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } reset(callback); runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "invalid_code", new HashMap<>(), new HashMap<>(), false, true, context, callback);