This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 369b181 [ZEPPELIN-4924] Add params to note run rest api (#3827) 369b181 is described below commit 369b181a7eff3057b2fb948f4e1ef4f32e770e8d Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jun 29 09:51:35 2020 +0800 [ZEPPELIN-4924] Add params to note run rest api (#3827) --- .../org/apache/zeppelin/rest/NotebookRestApi.java | 33 ++++++----- ...rametersRequest.java => ParametersRequest.java} | 20 ++++--- .../apache/zeppelin/service/NotebookService.java | 3 +- .../apache/zeppelin/rest/NotebookRestApiTest.java | 65 ++++++++++++++++++++++ .../apache/zeppelin/rest/ZeppelinRestApiTest.java | 13 +++-- .../java/org/apache/zeppelin/notebook/Note.java | 28 ++++++---- .../zeppelin/notebook/scheduler/CronJob.java | 4 +- .../org/apache/zeppelin/notebook/NotebookTest.java | 10 ++-- 8 files changed, 130 insertions(+), 46 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 615b789..dc5cb26 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -27,7 +27,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.inject.Singleton; import javax.ws.rs.DELETE; @@ -43,10 +42,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.ExecutionContext; -import org.apache.zeppelin.interpreter.ExecutionContextBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.Notebook; @@ -61,9 +57,8 @@ import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.NewNoteRequest; import org.apache.zeppelin.rest.message.NewParagraphRequest; import org.apache.zeppelin.rest.message.RenameNoteRequest; -import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest; +import org.apache.zeppelin.rest.message.ParametersRequest; import org.apache.zeppelin.rest.message.UpdateParagraphRequest; -import org.apache.zeppelin.scheduler.ExecutorFactory; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.server.JsonResponse; import org.apache.zeppelin.service.AuthenticationService; @@ -657,6 +652,7 @@ public class NotebookRestApi extends AbstractRestApi { * @param noteId ID of Note * @param blocking blocking until jobs are done * @param isolated use isolated interpreter for running this note + * @param message any parameters passed to note * @return JSON with status.OK * @throws IOException * @throws IllegalArgumentException @@ -666,7 +662,8 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response runNoteJobs(@PathParam("noteId") String noteId, @QueryParam("blocking") Boolean blocking, - @QueryParam("isolated") Boolean isolated) + @QueryParam("isolated") Boolean isolated, + String message) throws IOException, IllegalArgumentException { if (blocking == null) { blocking = false; @@ -674,8 +671,14 @@ public class NotebookRestApi extends AbstractRestApi { if (isolated == null) { isolated = false; } + Map<String, Object> params = new HashMap<>(); + if (!StringUtils.isEmpty(message)) { + ParametersRequest request = + ParametersRequest.fromJson(message); + params = request.getParams(); + } - LOG.info("Run note jobs, noteId: {} blocking: {}, isolated: {}", noteId, blocking, isolated); + LOG.info("Run note jobs, noteId: {} blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params); Note note = notebook.getNote(noteId); AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal()); subject.setRoles(new LinkedList<>(authenticationService.getAssociatedRoles())); @@ -684,7 +687,7 @@ public class NotebookRestApi extends AbstractRestApi { //TODO(zjffdu), can we run a note via rest api when cron is enabled ? try { - note.runAll(subject, blocking, isolated); + note.runAll(subject, blocking, isolated, params); return new JsonResponse<>(Status.OK).build(); } catch (Exception ex) { LOG.error("Exception from run", ex); @@ -790,8 +793,8 @@ public class NotebookRestApi extends AbstractRestApi { Map<String, Object> params = new HashMap<>(); if (!StringUtils.isEmpty(message)) { - RunParagraphWithParametersRequest request = - RunParagraphWithParametersRequest.fromJson(message); + ParametersRequest request = + ParametersRequest.fromJson(message); params = request.getParams(); } notebookService.runParagraph(noteId, paragraphId, paragraph.getTitle(), @@ -827,8 +830,8 @@ public class NotebookRestApi extends AbstractRestApi { Map<String, Object> params = new HashMap<>(); if (!StringUtils.isEmpty(message)) { - RunParagraphWithParametersRequest request = - RunParagraphWithParametersRequest.fromJson(message); + ParametersRequest request = + ParametersRequest.fromJson(message); params = request.getParams(); } @@ -1038,8 +1041,8 @@ public class NotebookRestApi extends AbstractRestApi { throws IOException { // handle params if presented if (!StringUtils.isEmpty(message)) { - RunParagraphWithParametersRequest request = - RunParagraphWithParametersRequest.fromJson(message); + ParametersRequest request = + ParametersRequest.fromJson(message); Map<String, Object> paramsForUpdating = request.getParams(); if (paramsForUpdating != null) { paragraph.settings.getParams().putAll(paramsForUpdating); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RunParagraphWithParametersRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParametersRequest.java similarity index 70% rename from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RunParagraphWithParametersRequest.java rename to zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParametersRequest.java index be703da..77164b5 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RunParagraphWithParametersRequest.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParametersRequest.java @@ -23,14 +23,18 @@ import java.util.Map; import org.apache.zeppelin.common.JsonSerializable; /** - * RunParagraphWithParametersRequest rest api request message. + * ParametersRequest rest api request message. */ -public class RunParagraphWithParametersRequest implements JsonSerializable { - private static final Gson gson = new Gson(); +public class ParametersRequest implements JsonSerializable { + private static final Gson GSON = new Gson(); - Map<String, Object> params; + private Map<String, Object> params; - public RunParagraphWithParametersRequest() { + public ParametersRequest() { + } + + public ParametersRequest(Map<String, Object> params) { + this.params = params; } public Map<String, Object> getParams() { @@ -38,10 +42,10 @@ public class RunParagraphWithParametersRequest implements JsonSerializable { } public String toJson() { - return gson.toJson(this); + return GSON.toJson(this); } - public static RunParagraphWithParametersRequest fromJson(String json) { - return gson.fromJson(json, RunParagraphWithParametersRequest.class); + public static ParametersRequest fromJson(String json) { + return GSON.fromJson(json, ParametersRequest.class); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 1d4493c..71a67af 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -419,7 +420,7 @@ public class NotebookService { } else { try { // run note directly when parameter `paragraphs` is null. - note.runAll(context.getAutheInfo(), true, false); + note.runAll(context.getAutheInfo(), true, false, new HashMap<>()); return true; } catch (Exception e) { LOGGER.warn("Fail to run note: " + note.getName(), e); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java index c4e898c..c05df07 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java @@ -33,6 +33,7 @@ import org.apache.commons.httpclient.methods.PutMethod; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.rest.message.ParametersRequest; import org.apache.zeppelin.socket.NotebookServer; import org.apache.zeppelin.utils.TestUtils; import org.junit.AfterClass; @@ -43,6 +44,7 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -374,6 +376,69 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } @Test + public void testRunNoteWithParams() throws IOException, InterruptedException { + Note note1 = null; + try { + note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + // 2 paragraphs + // P1: + // %python + // name = z.input('name', 'world') + // print(name) + // P2: + // %sh + // echo ${name|world} + // + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python name = z.input('name', 'world')\nprint(name)"); + p2.setText("%sh echo '${name=world}'"); + + Map<String, Object> paramsMap = new HashMap<>(); + paramsMap.put("name", "zeppelin"); + ParametersRequest parametersRequest = new ParametersRequest(paramsMap); + PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=false&isolated=true&", + parametersRequest.toJson()); + assertThat(post, isAllowed()); + Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), + new TypeToken<Map<String, Object>>() {}.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + + // wait for all the paragraphs are done + while(note1.isRunning()) { + Thread.sleep(1000); + } + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.FINISHED, p2.getStatus()); + assertEquals("zeppelin\n", p1.getReturn().message().get(0).getData()); + assertEquals("zeppelin\n", p2.getReturn().message().get(0).getData()); + + // another attempt rest api call without params + post = httpPost("/notebook/job/" + note1.getId() + "?blocking=false&isolated=true", ""); + assertThat(post, isAllowed()); + resp = gson.fromJson(post.getResponseBodyAsString(), + new TypeToken<Map<String, Object>>() {}.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + + // wait for all the paragraphs are done + while(note1.isRunning()) { + Thread.sleep(1000); + } + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.FINISHED, p2.getStatus()); + assertEquals("world\n", p1.getReturn().message().get(0).getData()); + assertEquals("world\n", p2.getReturn().message().get(0).getData()); + } finally { + // cleanup + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + } + } + + @Test public void testRunAllParagraph_FirstFailed() throws IOException { Note note1 = null; try { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 630b631..4a63eba 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -45,6 +45,7 @@ import org.junit.runners.MethodSorters; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -447,7 +448,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { TestUtils.getInstance(Notebook.class).saveNote(note, anonymous); String noteId = note.getId(); - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); // wait until job is finished or timeout. int timeout = 1; while (!paragraph.isTerminated()) { @@ -509,7 +510,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { TestUtils.getInstance(Notebook.class).saveNote(note, anonymous); String noteId = note.getId(); - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); // assume that status of the paragraph is running GetMethod get = httpGet("/notebook/job/" + noteId); assertThat("test get note job: ", get, isAllowed()); @@ -563,7 +564,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { TestUtils.getInstance(Notebook.class).saveNote(note, anonymous); String noteId = note.getId(); - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); // Call Run paragraph REST API PostMethod postParagraph = httpPost("/notebook/job/" + noteId + "/" + paragraph.getId(), @@ -601,7 +602,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { config.put("enabled", true); paragraph.setConfig(config); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false); + note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); String jsonRequest = "{\"cron\":\"* * * * * ?\" }"; // right cron expression but not exist note. @@ -651,7 +652,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { config.put("enabled", true); paragraph.setConfig(config); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false); + note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); String jsonRequest = "{\"cron\":\"* * * * * ?\" }"; // right cron expression. @@ -663,7 +664,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System"); note.setName("System/test2"); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false); + note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest); assertThat("", postCron, isAllowed()); postCron.releaseConnection(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 03b9c75..ad353a8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -764,13 +764,14 @@ public class Note implements JsonSerializable { */ public void runAll(AuthenticationInfo authInfo, boolean blocking, - boolean isolated) throws Exception { + boolean isolated, + Map<String, Object> params) throws Exception { if (blocking) { - runAllSync(authInfo, isolated); + runAllSync(authInfo, isolated, params); } else { ExecutorFactory.singleton().getNoteJobExecutor().submit(() -> { try { - runAllSync(authInfo, isolated); + runAllSync(authInfo, isolated, params); } catch (Exception e) { LOGGER.warn("Fail to run note: " + id, e); } @@ -784,7 +785,7 @@ public class Note implements JsonSerializable { * @param authInfo * @param isolated */ - private void runAllSync(AuthenticationInfo authInfo, boolean isolated) throws Exception { + private void runAllSync(AuthenticationInfo authInfo, boolean isolated, Map<String, Object> params) throws Exception { setIsolatedMode(isolated); setRunning(true); setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now())); @@ -794,7 +795,11 @@ public class Note implements JsonSerializable { continue; } p.setAuthenticationInfo(authInfo); + Map<String, Object> originalParams = p.settings.getParams(); try { + if (params != null && !params.isEmpty()) { + p.settings.setParams(params); + } Interpreter interpreter = p.getBindedInterpreter(); if (interpreter != null) { // set interpreter property to execution.mode to be note @@ -802,14 +807,17 @@ public class Note implements JsonSerializable { interpreter.setProperty(".execution.mode", "note"); interpreter.setProperty(".noteId", id); } + // Must run each paragraph in blocking way. + if (!run(p.getId(), true)) { + LOGGER.warn("Skip running the remain notes because paragraph {} fails", p.getId()); + throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, result: " + + p.getReturn()); + } } catch (InterpreterNotFoundException e) { // ignore, because the following run method will fail if interpreter not found. - } - // Must run each paragraph in blocking way. - if (!run(p.getId(), true)) { - LOGGER.warn("Skip running the remain notes because paragraph {} fails", p.getId()); - throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, result: " + - p.getReturn()); + } finally { + // reset params to the original value + p.settings.setParams(originalParams); } } } catch (Exception e) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index 95d08d3..c03ee7c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -28,6 +28,8 @@ import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; + /** Cron task for the note. */ public class CronJob implements org.quartz.Job { private static final Logger LOGGER = LoggerFactory.getLogger(CronJob.class); @@ -56,7 +58,7 @@ public class CronJob implements org.quartz.Job { StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles, null); try { - note.runAll(authenticationInfo, true, true); + note.runAll(authenticationInfo, true, true, new HashMap<>()); } catch (Exception e) { LOGGER.warn("Fail to run note: " + note.getName(), e); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index aeb9a8e..d995664 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -475,7 +475,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo p3.setText("%mock1 p3"); // when - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); assertEquals("repl1: p1", p1.getReturn().message().get(0).getData()); assertNull(p2.getReturn()); @@ -829,7 +829,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo String simpleText = "hello world"; p.setText(simpleText); - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); String exportedNoteJson = notebook.exportNote(note.getId()); @@ -861,7 +861,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p.setText("hello world"); - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); p.setStatus(Status.RUNNING); Note cloneNote = notebook.cloneNote(note.getId(), "clone note", anonymous); @@ -897,7 +897,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo for (InterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) { intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId())); } - note.runAll(anonymous, true, false); + note.runAll(anonymous, true, false, new HashMap<>()); assertEquals(2, interpreterSettingManager.getAllResources().size()); @@ -1166,7 +1166,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo p3.setText("%mock1 sleep 1000"); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false); + note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); // wait until first paragraph finishes and second paragraph starts while (p1.getStatus() != Status.FINISHED || p2.getStatus() != Status.RUNNING) Thread.yield();