This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new b1373e0 [ZEPPELIN-4928] Refactoring of NotebookRestApi b1373e0 is described below commit b1373e01ab04d12762f26e7f2543db15bd693544 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jun 30 14:07:10 2020 +0800 [ZEPPELIN-4928] Refactoring of NotebookRestApi ### What is this PR for? Several improvements: * Move note and paragraph job status from Note to `NoteJobStatus` & `ParagraphJobStatus` * Throw exception when running one note while it is still in running state ### What type of PR is it? [Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4928 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3839 from zjffdu/ZEPPELIN-4928 and squashes the following commits: 550ff8561 [Jeff Zhang] [ZEPPELIN-4928] Refactoring of NotebookRestApi --- .../org/apache/zeppelin/rest/NotebookRestApi.java | 125 ++++++++++++--------- .../zeppelin/rest/message/NoteJobStatus.java | 50 +++++++++ .../zeppelin/rest/message/ParagraphJobStatus.java | 66 +++++++++++ .../apache/zeppelin/service/NotebookService.java | 30 ++--- .../apache/zeppelin/rest/ZeppelinRestApiTest.java | 22 ++-- .../interpreter/ManagedInterpreterGroup.java | 8 +- .../java/org/apache/zeppelin/notebook/Note.java | 43 +------ 7 files changed, 225 insertions(+), 119 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index b4f136c..13c1b99 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -56,6 +56,8 @@ import org.apache.zeppelin.rest.exception.ParagraphNotFoundException; import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.NewNoteRequest; import org.apache.zeppelin.rest.message.NewParagraphRequest; +import org.apache.zeppelin.rest.message.NoteJobStatus; +import org.apache.zeppelin.rest.message.ParagraphJobStatus; import org.apache.zeppelin.rest.message.RenameNoteRequest; import org.apache.zeppelin.rest.message.ParametersRequest; import org.apache.zeppelin.rest.message.UpdateParagraphRequest; @@ -78,8 +80,8 @@ import org.slf4j.LoggerFactory; @Produces("application/json") @Singleton public class NotebookRestApi extends AbstractRestApi { - private static final Logger LOG = LoggerFactory.getLogger(NotebookRestApi.class); - private static Gson gson = new Gson(); + private static final Logger LOGGER = LoggerFactory.getLogger(NotebookRestApi.class); + private static final Gson GSON = new Gson(); private ZeppelinConfiguration zConf; private Notebook notebook; @@ -133,7 +135,7 @@ public class NotebookRestApi extends AbstractRestApi { } private String ownerPermissionError(Set<String> current, Set<String> allowed) { - LOG.info("Cannot change permissions. Connection owners {}. Allowed owners {}", + LOGGER.info("Cannot change permissions. Connection owners {}. Allowed owners {}", current.toString(), allowed.toString()); return "Insufficient privileges to change permissions.\n\n" + "Allowed owners: " + allowed.toString() + "\n\n" + @@ -156,7 +158,7 @@ public class NotebookRestApi extends AbstractRestApi { private void checkIfUserIsAnon(String errorMsg) { boolean isAuthenticated = authenticationService.isAuthenticated(); if (isAuthenticated && authenticationService.getPrincipal().equals("anonymous")) { - LOG.info("Anonymous user cannot set any permissions for this note."); + LOGGER.info("Anonymous user cannot set any permissions for this note."); throw new ForbiddenException(errorMsg); } } @@ -217,7 +219,7 @@ public class NotebookRestApi extends AbstractRestApi { private void checkIfNoteSupportsCron(Note note) { if (!note.isCronSupported(notebook.getConf())) { - LOG.error("Cron is not enabled from Zeppelin server"); + LOGGER.error("Cron is not enabled from Zeppelin server"); throw new ForbiddenException("Cron is not enabled from Zeppelin server"); } } @@ -247,18 +249,19 @@ public class NotebookRestApi extends AbstractRestApi { ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId))); HashMap<String, HashSet<String>> permMap = - gson.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() { + GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() { }.getType()); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); - LOG.info("Set permissions {} {} {} {} {} {}", noteId, principal, permMap.get("owners"), - permMap.get("readers"), permMap.get("runners"), permMap.get("writers")); - HashSet<String> readers = permMap.get("readers"); HashSet<String> runners = permMap.get("runners"); HashSet<String> owners = permMap.get("owners"); HashSet<String> writers = permMap.get("writers"); + + LOGGER.info("Set permissions to note: {} with current user:{}, owners:{}, readers:{}, runners:{}, writers:{}", + noteId, principal, owners, readers, runners, writers); + // Set readers, if runners, writers and owners is empty -> set to user requesting the change if (readers != null && !readers.isEmpty()) { if (runners.isEmpty()) { @@ -291,7 +294,7 @@ public class NotebookRestApi extends AbstractRestApi { authorizationService.setRunners(noteId, runners); authorizationService.setWriters(noteId, writers); authorizationService.setOwners(noteId, owners); - LOG.debug("After set permissions {} {} {} {}", authorizationService.getOwners(noteId), + LOGGER.debug("After set permissions {} {} {} {}", authorizationService.getOwners(noteId), authorizationService.getReaders(noteId), authorizationService.getRunners(noteId), authorizationService.getWriters(noteId)); AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal()); @@ -301,6 +304,12 @@ public class NotebookRestApi extends AbstractRestApi { return new JsonResponse<>(Status.OK).build(); } + /** + * Return noteinfo list for the current user who has reader permission. + * + * @return + * @throws IOException + */ @GET @ZeppelinApi public Response getNoteList() throws IOException { @@ -309,6 +318,13 @@ public class NotebookRestApi extends AbstractRestApi { return new JsonResponse<>(Status.OK, "", notesInfo).build(); } + /** + * Get note of this specified noteId. + * + * @param noteId + * @return + * @throws IOException + */ @GET @Path("{noteId}") @ZeppelinApi @@ -319,7 +335,7 @@ public class NotebookRestApi extends AbstractRestApi { } /** - * export note REST API. + * Export note REST API. * * @param noteId ID of Note * @return note JSON with status.OK @@ -335,7 +351,8 @@ public class NotebookRestApi extends AbstractRestApi { } /** - * import new note REST API. + * Import new note REST API. + * TODO(zjffdu) support to import jupyter note. * * @param noteJson - note Json * @return JSON with new note ID @@ -351,7 +368,7 @@ public class NotebookRestApi extends AbstractRestApi { } /** - * Create new note REST API. + * Create new note REST API with note json. * * @param message - JSON with new note name * @return JSON with new note ID @@ -361,7 +378,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response createNote(String message) throws IOException { String user = authenticationService.getPrincipal(); - LOG.info("Create new note by JSON {}", message); + LOGGER.info("Create new note by JSON {}", message); NewNoteRequest request = NewNoteRequest.fromJson(message); Note note = notebookService.createNote( request.getName(), @@ -389,7 +406,7 @@ public class NotebookRestApi extends AbstractRestApi { @Path("{noteId}") @ZeppelinApi public Response deleteNote(@PathParam("noteId") String noteId) throws IOException { - LOG.info("Delete note {} ", noteId); + LOGGER.info("Delete note {} ", noteId); notebookService.removeNote(noteId, getServiceContext(), new RestServiceCallback<String>() { @@ -416,7 +433,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response cloneNote(@PathParam("noteId") String noteId, String message) throws IOException, IllegalArgumentException { - LOG.info("clone note by JSON {}", message); + LOGGER.info("Clone note by JSON {}", message); checkIfUserCanWrite(noteId, "Insufficient privileges you cannot clone this note"); NewNoteRequest request = NewNoteRequest.fromJson(message); String newNoteName = null; @@ -447,11 +464,11 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response renameNote(@PathParam("noteId") String noteId, String message) throws IOException { - LOG.info("rename note by JSON {}", message); - RenameNoteRequest request = gson.fromJson(message, RenameNoteRequest.class); + LOGGER.info("Rename note by JSON {}", message); + RenameNoteRequest request = GSON.fromJson(message, RenameNoteRequest.class); String newName = request.getName(); if (newName.isEmpty()) { - LOG.warn("Trying to rename notebook {} with empty name parameter", noteId); + LOGGER.warn("Trying to rename notebook {} with empty name parameter", noteId); throw new BadRequestException("name can not be empty"); } notebookService.renameNote(noteId, request.getName(), false, getServiceContext(), @@ -478,7 +495,7 @@ public class NotebookRestApi extends AbstractRestApi { public Response insertParagraph(@PathParam("noteId") String noteId, String message) throws IOException { String user = authenticationService.getPrincipal(); - LOG.info("insert paragraph {} {}", noteId, message); + LOGGER.info("Insert paragraph {} {}", noteId, message); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -511,7 +528,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getParagraph(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) throws IOException { - LOG.info("get paragraph {} {}", noteId, paragraphId); + LOGGER.info("Get paragraph {} {}", noteId, paragraphId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -523,7 +540,7 @@ public class NotebookRestApi extends AbstractRestApi { } /** - * Update paragraph. + * Update paragraph. Only update title and text is supported. * * @param message json containing the "text" and optionally the "title" of the paragraph, e.g. * {"text" : "updated text", "title" : "Updated title" } @@ -535,7 +552,7 @@ public class NotebookRestApi extends AbstractRestApi { @PathParam("paragraphId") String paragraphId, String message) throws IOException { String user = authenticationService.getPrincipal(); - LOG.info("{} will update paragraph {} {}", user, noteId, paragraphId); + LOGGER.info("{} will update paragraph {} {}", user, noteId, paragraphId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -543,7 +560,7 @@ public class NotebookRestApi extends AbstractRestApi { Paragraph p = note.getParagraph(paragraphId); checkIfParagraphIsNotNull(p); - UpdateParagraphRequest updatedParagraph = gson.fromJson(message, UpdateParagraphRequest.class); + UpdateParagraphRequest updatedParagraph = GSON.fromJson(message, UpdateParagraphRequest.class); p.setText(updatedParagraph.getText()); if (updatedParagraph.getTitle() != null) { @@ -556,6 +573,15 @@ public class NotebookRestApi extends AbstractRestApi { return new JsonResponse<>(Status.OK, "").build(); } + /** + * Update paragraph config rest api. + * + * @param noteId + * @param paragraphId + * @param message + * @return + * @throws IOException + */ @PUT @Path("{noteId}/paragraph/{paragraphId}/config") @ZeppelinApi @@ -563,7 +589,7 @@ public class NotebookRestApi extends AbstractRestApi { @PathParam("paragraphId") String paragraphId, String message) throws IOException { String user = authenticationService.getPrincipal(); - LOG.info("{} will update paragraph config {} {}", user, noteId, paragraphId); + LOGGER.info("{} will update paragraph config {} {}", user, noteId, paragraphId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -571,7 +597,7 @@ public class NotebookRestApi extends AbstractRestApi { Paragraph p = note.getParagraph(paragraphId); checkIfParagraphIsNotNull(p); - Map<String, Object> newConfig = gson.fromJson(message, HashMap.class); + Map<String, Object> newConfig = GSON.fromJson(message, HashMap.class); configureParagraph(p, newConfig, user); AuthenticationInfo subject = new AuthenticationInfo(user); notebook.saveNote(note, subject); @@ -592,7 +618,7 @@ public class NotebookRestApi extends AbstractRestApi { @PathParam("paragraphId") String paragraphId, @PathParam("newIndex") String newIndex) throws IOException { - LOG.info("move paragraph {} {} {}", noteId, paragraphId, newIndex); + LOGGER.info("Move paragraph {} {} {}", noteId, paragraphId, newIndex); notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex), getServiceContext(), new RestServiceCallback<Paragraph>() { @@ -617,7 +643,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response deleteParagraph(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) throws IOException { - LOG.info("delete paragraph {} {}", noteId, paragraphId); + LOGGER.info("Delete paragraph {} {}", noteId, paragraphId); notebookService.removeParagraph(noteId, paragraphId, getServiceContext(), new RestServiceCallback<Paragraph>() { @Override @@ -640,7 +666,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response clearAllParagraphOutput(@PathParam("noteId") String noteId) throws IOException { - LOG.info("clear all paragraph output of note {}", noteId); + LOGGER.info("Clear all paragraph output of note {}", noteId); notebookService.clearAllParagraphOutput(noteId, getServiceContext(), new RestServiceCallback<>()); return new JsonResponse(Status.OK, "").build(); @@ -678,7 +704,7 @@ public class NotebookRestApi extends AbstractRestApi { params = request.getParams(); } - LOG.info("Run note jobs, noteId: {}, blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params); + LOGGER.info("Run note jobs, noteId: {}, blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params); Note note = notebook.getNote(noteId); AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal()); subject.setRoles(new LinkedList<>(authenticationService.getAssociatedRoles())); @@ -690,7 +716,7 @@ public class NotebookRestApi extends AbstractRestApi { note.runAll(subject, blocking, isolated, params); return new JsonResponse<>(Status.OK).build(); } catch (Exception ex) { - LOG.error("Exception from run", ex); + LOGGER.error("Exception from run", ex); return new JsonResponse<>(Status.EXPECTATION_FAILED, ex.getMessage()).build(); } } @@ -708,7 +734,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response stopNoteJobs(@PathParam("noteId") String noteId) throws IOException, IllegalArgumentException { - LOG.info("stop note jobs {} ", noteId); + LOGGER.info("Stop note jobs {} ", noteId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); checkIfUserCanRun(noteId, "Insufficient privileges you cannot stop this job for this note"); @@ -734,12 +760,12 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getNoteJobStatus(@PathParam("noteId") String noteId) throws IOException, IllegalArgumentException { - LOG.info("get note job status."); + LOGGER.info("Get note job status."); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); checkIfUserCanRead(noteId, "Insufficient privileges you cannot get job status"); - return new JsonResponse<>(Status.OK, null, note.generateParagraphsInfo()).build(); + return new JsonResponse<>(Status.OK, null, new NoteJobStatus(note)).build(); } /** @@ -757,7 +783,7 @@ public class NotebookRestApi extends AbstractRestApi { public Response getNoteParagraphJobStatus(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) throws IOException, IllegalArgumentException { - LOG.info("get note paragraph job status."); + LOGGER.info("Get note paragraph job status."); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); checkIfUserCanRead(noteId, "Insufficient privileges you cannot get job status"); @@ -765,8 +791,7 @@ public class NotebookRestApi extends AbstractRestApi { Paragraph paragraph = note.getParagraph(paragraphId); checkIfParagraphIsNotNull(paragraph); - return new JsonResponse<>(Status.OK, null, note.generateSingleParagraphInfo(paragraphId)). - build(); + return new JsonResponse<>(Status.OK, null, new ParagraphJobStatus(paragraph)).build(); } /** @@ -784,7 +809,7 @@ public class NotebookRestApi extends AbstractRestApi { public Response runParagraph(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId, String message) throws IOException, IllegalArgumentException { - LOG.info("run paragraph job asynchronously {} {} {}", noteId, paragraphId, message); + LOGGER.info("Run paragraph job asynchronously {} {} {}", noteId, paragraphId, message); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -821,7 +846,7 @@ public class NotebookRestApi extends AbstractRestApi { @PathParam("paragraphId") String paragraphId, String message) throws IOException, IllegalArgumentException { - LOG.info("run paragraph synchronously {} {} {}", noteId, paragraphId, message); + LOGGER.info("Run paragraph synchronously {} {} {}", noteId, paragraphId, message); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -866,7 +891,7 @@ public class NotebookRestApi extends AbstractRestApi { public Response cancelParagraph(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) throws IOException, IllegalArgumentException { - LOG.info("stop paragraph job {} ", noteId); + LOGGER.info("stop paragraph job {} ", noteId); notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(), new RestServiceCallback<Paragraph>()); return new JsonResponse<>(Status.OK).build(); @@ -885,7 +910,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response registerCronJob(@PathParam("noteId") String noteId, String message) throws IOException, IllegalArgumentException { - LOG.info("Register cron job note={} request cron msg={}", noteId, message); + LOGGER.info("Register cron job note={} request cron msg={}", noteId, message); CronRequest request = CronRequest.fromJson(message); @@ -920,7 +945,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response removeCronJob(@PathParam("noteId") String noteId) throws IOException, IllegalArgumentException { - LOG.info("Remove cron job note {}", noteId); + LOGGER.info("Remove cron job note {}", noteId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -950,7 +975,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getCronJob(@PathParam("noteId") String noteId) throws IOException, IllegalArgumentException { - LOG.info("Get cron job note {}", noteId); + LOGGER.info("Get cron job note {}", noteId); Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); @@ -974,7 +999,7 @@ public class NotebookRestApi extends AbstractRestApi { @Path("jobmanager/") @ZeppelinApi public Response getJobListforNote() throws IOException, IllegalArgumentException { - LOG.info("Get note jobs for job manager"); + LOGGER.info("Get note jobs for job manager"); List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>()); Map<String, Object> response = new HashMap<>(); @@ -997,7 +1022,7 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime) throws IOException, IllegalArgumentException { - LOG.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime); + LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime); List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(), new RestServiceCallback<>()); @@ -1014,7 +1039,7 @@ public class NotebookRestApi extends AbstractRestApi { @Path("search") @ZeppelinApi public Response search(@QueryParam("q") String queryTerm) { - LOG.info("Searching notes for: {}", queryTerm); + LOGGER.info("Searching notes for: {}", queryTerm); String principal = authenticationService.getPrincipal(); Set<String> roles = authenticationService.getAssociatedRoles(); HashSet<String> userAndRoles = new HashSet<>(); @@ -1032,7 +1057,7 @@ public class NotebookRestApi extends AbstractRestApi { i--; } } - LOG.info("{} notes found", notesFound.size()); + LOGGER.info("{} notes found", notesFound.size()); return new JsonResponse<>(Status.OK, notesFound).build(); } @@ -1053,7 +1078,7 @@ public class NotebookRestApi extends AbstractRestApi { } private void initParagraph(Paragraph p, NewParagraphRequest request, String user) { - LOG.info("Init Paragraph for user {}", user); + LOGGER.info("Init Paragraph for user {}", user); checkIfParagraphIsNotNull(p); p.setTitle(request.getTitle()); p.setText(request.getText()); @@ -1064,9 +1089,9 @@ public class NotebookRestApi extends AbstractRestApi { } private void configureParagraph(Paragraph p, Map<String, Object> newConfig, String user) { - LOG.info("Configure Paragraph for user {}", user); + LOGGER.info("Configure Paragraph for user {}", user); if (newConfig == null || newConfig.isEmpty()) { - LOG.warn("{} is trying to update paragraph {} of note {} with empty config", + LOGGER.warn("{} is trying to update paragraph {} of note {} with empty config", user, p.getId(), p.getNote().getId()); throw new BadRequestException("paragraph config cannot be empty"); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java new file mode 100644 index 0000000..27d9026 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; +import org.apache.zeppelin.notebook.Note; + +import java.util.List; +import java.util.stream.Collectors; + +public class NoteJobStatus { + private static final Gson GSON = new Gson(); + + private String id; + private boolean isRunning; + @SerializedName("paragraphs") + private List<ParagraphJobStatus> paragraphJobStatusList; + + public NoteJobStatus(Note note) { + this.id = note.getId(); + this.isRunning = note.isRunning(); + this.paragraphJobStatusList = note.getParagraphs().stream() + .map(p -> new ParagraphJobStatus(p)) + .collect(Collectors.toList()); + } + + public List<ParagraphJobStatus> getParagraphJobStatusList() { + return paragraphJobStatusList; + } + + public static NoteJobStatus fromJson(String json) { + return GSON.fromJson(json, NoteJobStatus.class); + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java new file mode 100644 index 0000000..8249524 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import org.apache.zeppelin.notebook.Paragraph; + +public class ParagraphJobStatus { + private String id; + private String status; + private String started; + private String finished; + private String progress; + + public ParagraphJobStatus(Paragraph p) { + this.id = p.getId(); + this.status = p.getStatus().toString(); + if (p.getDateStarted() != null) { + this.started = p.getDateStarted().toString(); + } + if (p.getDateFinished() != null) { + this.finished = p.getDateFinished().toString(); + } + if (p.getStatus().isRunning()) { + this.progress = String.valueOf(p.progress()); + } else if (p.isTerminated()){ + this.progress = String.valueOf(100); + } else { + this.progress = String.valueOf(0); + } + } + + public String getId() { + return id; + } + + public String getStatus() { + return status; + } + + public String getStarted() { + return started; + } + + public String getFinished() { + return finished; + } + + public String getProgress() { + return progress; + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index c73eaf5..a36a8d0 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -393,10 +393,10 @@ public class NotebookService { return false; } - note.setRunning(true); - try { - if (paragraphs != null) { - // run note via the data passed from frontend + if (paragraphs != null) { + // run note via the data passed from frontend + try { + note.setRunning(true); for (Map<String, Object> raw : paragraphs) { String paragraphId = (String) raw.get("id"); if (paragraphId == null) { @@ -418,18 +418,18 @@ public class NotebookService { throw new IOException("Fail to run paragraph json: " + raw); } } - } else { - try { - // run note directly when parameter `paragraphs` is null. - note.runAll(context.getAutheInfo(), true, false, new HashMap<>()); - return true; - } catch (Exception e) { - LOGGER.warn("Fail to run note: " + note.getName(), e); - return false; - } + } finally { + note.setRunning(false); + } + } else { + try { + // run note directly when parameter `paragraphs` is null. + note.runAll(context.getAutheInfo(), true, false, new HashMap<>()); + return true; + } catch (Exception e) { + LOGGER.warn("Fail to run note: " + note.getName(), e); + return false; } - } finally { - note.setRunning(false); } return true; diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 65faed8..6e87196 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -34,6 +34,7 @@ import org.apache.commons.httpclient.methods.PutMethod; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.notebook.AuthorizationService; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.rest.message.NoteJobStatus; import org.apache.zeppelin.service.AuthenticationService; import org.apache.zeppelin.utils.TestUtils; import org.junit.AfterClass; @@ -521,10 +522,9 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { Map<String, Object> resp = gson.fromJson(responseBody, new TypeToken<Map<String, Object>>() {}.getType()); - List<Map<String, Object>> paragraphs = (List<Map<String, Object>>) resp.get("body"); - assertEquals(1, paragraphs.size()); - assertTrue(paragraphs.get(0).containsKey("progress")); - int progress = Integer.parseInt((String) paragraphs.get(0).get("progress")); + NoteJobStatus noteJobStatus = NoteJobStatus.fromJson(gson.toJson(resp.get("body"))); + assertEquals(1, noteJobStatus.getParagraphJobStatusList().size()); + int progress = Integer.parseInt(noteJobStatus.getParagraphJobStatusList().get(0).getProgress()); assertTrue(progress >= 0 && progress <= 100); // wait until job is finished or timeout. @@ -652,7 +652,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { config.put("enabled", true); paragraph.setConfig(config); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); + note.runAll(AuthenticationInfo.ANONYMOUS, true, true, new HashMap<>()); String jsonRequest = "{\"cron\":\"* * * * * ?\" }"; // right cron expression. @@ -664,7 +664,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System"); note.setName("System/test2"); - note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>()); + note.runAll(AuthenticationInfo.ANONYMOUS, true, true, new HashMap<>()); postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest); assertThat("", postCron, isAllowed()); postCron.releaseConnection(); @@ -687,7 +687,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } @Test - public void testRegressionZEPPELIN_527() throws IOException { + public void testRegressionZEPPELIN_527() throws Exception { Note note = null; try { note = TestUtils.getInstance(Notebook.class).createNote("note1_testRegressionZEPPELIN_527", anonymous); @@ -695,16 +695,16 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { note.setName("note for run test"); Paragraph paragraph = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); paragraph.setText("%spark\nval param = z.input(\"param\").toString\nprintln(param)"); - + note.runAll(AuthenticationInfo.ANONYMOUS, true, false, new HashMap<>()); TestUtils.getInstance(Notebook.class).saveNote(note, anonymous); GetMethod getNoteJobs = httpGet("/notebook/job/" + note.getId()); assertThat("test note jobs run:", getNoteJobs, isAllowed()); Map<String, Object> resp = gson.fromJson(getNoteJobs.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {}.getType()); - List<Map<String, String>> body = (List<Map<String, String>>) resp.get("body"); - assertFalse(body.get(0).containsKey("started")); - assertFalse(body.get(0).containsKey("finished")); + NoteJobStatus noteJobStatus = NoteJobStatus.fromJson(gson.toJson(resp.get("body"))); + assertNotNull(noteJobStatus.getParagraphJobStatusList().get(0).getStarted()); + assertNotNull(noteJobStatus.getParagraphJobStatusList().get(0).getFinished()); getNoteJobs.releaseConnection(); } finally { //cleanup diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index a27677f..a1a478e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -147,9 +147,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup { if (Boolean.parseBoolean( interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) { for (final Job job : scheduler.getAllJobs()) { - job.abort(); - job.setStatus(Job.Status.ABORT); - LOGGER.info("Job " + job.getJobName() + " aborted "); + if (!job.isTerminated()) { + job.abort(); + job.setStatus(Job.Status.ABORT); + LOGGER.info("Job " + job.getJobName() + " aborted "); + } } } else { LOGGER.info("Keep job running while closing interpreter: " + interpreter.getClassName()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 882b368..447bd2d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -703,46 +703,6 @@ public class Note implements JsonSerializable { } } - public List<Map<String, String>> generateParagraphsInfo() { - List<Map<String, String>> paragraphsInfo = new LinkedList<>(); - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - Map<String, String> info = populateParagraphInfo(p); - paragraphsInfo.add(info); - } - } - return paragraphsInfo; - } - - public Map<String, String> generateSingleParagraphInfo(String paragraphId) { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - if (p.getId().equals(paragraphId)) { - return populateParagraphInfo(p); - } - } - return new HashMap<>(); - } - } - - private Map<String, String> populateParagraphInfo(Paragraph p) { - Map<String, String> info = new HashMap<>(); - info.put("id", p.getId()); - info.put("status", p.getStatus().toString()); - if (p.getDateStarted() != null) { - info.put("started", p.getDateStarted().toString()); - } - if (p.getDateFinished() != null) { - info.put("finished", p.getDateFinished().toString()); - } - if (p.getStatus().isRunning()) { - info.put("progress", String.valueOf(p.progress())); - } else { - info.put("progress", String.valueOf(100)); - } - return info; - } - private void setParagraphMagic(Paragraph p, int index) { if (paragraphs.size() > 0) { String replName; @@ -771,6 +731,9 @@ public class Note implements JsonSerializable { boolean blocking, boolean isolated, Map<String, Object> params) throws Exception { + if (isRunning()) { + throw new Exception("Unable to run note:" + id + " because it is still in RUNNING state."); + } setIsolatedMode(isolated); setRunning(true); setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now()));