This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 409ef7b [ZEPPELIN-4499]. Corrupted note make all the notes unavailable 409ef7b is described below commit 409ef7b2359b44f22da9e5c6d18682e9e5837550 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Dec 23 10:09:18 2019 +0800 [ZEPPELIN-4499]. Corrupted note make all the notes unavailable ### What is this PR for? The root cause is we didn't catch the right exception when reading note json from NotebookRepo. This PR fix this issue and also do some code refactoring, checking whether note is null before using it. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4499 ### How should this be tested? CI is passed. https://travis-ci.org/zjffdu/zeppelin/builds/628928499 Also verify it manually. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? NO Author: Jeff Zhang <zjf...@apache.org> Closes #3560 from zjffdu/ZEPPELIN-4499 and squashes the following commits: a57227257 [Jeff Zhang] [ZEPPELIN-4499]. Corrupted note make all the notes unavailable --- .../integration/ZeppelinSparkClusterTest.java | 2 - .../zeppelin/notebook/repo/MongoNotebookRepo.java | 2 +- .../notebook/repo/OldMongoNotebookRepo.java | 2 +- .../org/apache/zeppelin/rest/HeliumRestApi.java | 17 +- .../org/apache/zeppelin/rest/NotebookRestApi.java | 1 + .../apache/zeppelin/service/JobManagerService.java | 3 + .../apache/zeppelin/service/NotebookService.java | 4 +- .../org/apache/zeppelin/socket/NotebookServer.java | 91 +++++---- .../zeppelin/utils/InterpreterBindingUtils.java | 3 +- .../cluster/ClusterNoteEventListenerTest.java | 7 +- .../zeppelin/rest/NotebookSecurityRestApiTest.java | 2 - .../apache/zeppelin/rest/ZeppelinRestApiTest.java | 25 ++- .../apache/zeppelin/socket/NotebookServerTest.java | 2 +- .../zeppelin/helium/HeliumApplicationFactory.java | 12 +- .../interpreter/RemoteInterpreterEventServer.java | 7 +- .../remote/RemoteInterpreterProcessListener.java | 2 +- .../zeppelin/notebook/AuthorizationService.java | 212 +++++++++++++-------- .../java/org/apache/zeppelin/notebook/Note.java | 18 +- .../org/apache/zeppelin/notebook/NoteManager.java | 9 +- .../org/apache/zeppelin/notebook/Notebook.java | 101 +++++----- .../repo/zeppelinhub/websocket/ZeppelinClient.java | 4 +- .../zeppelin/notebook/scheduler/CronJob.java | 13 +- .../notebook/scheduler/QuartzSchedulerService.java | 9 +- .../org/apache/zeppelin/notebook/NoteTest.java | 3 +- 24 files changed, 342 insertions(+), 209 deletions(-) diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 9397c49..0cb6db8 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -994,8 +994,6 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { p1.setText("%spark\nimport com.databricks.spark.csv._"); note.run(p1.getId(), true); assertEquals(Status.FINISHED, p1.getStatus()); - - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); } finally { if (null != note) { TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java index 9fdd180..34ebec6 100644 --- a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java @@ -432,7 +432,7 @@ public class MongoNotebookRepo implements NotebookRepo { /** * Convert document to note. */ - private Note documentToNote(Document doc) { + private Note documentToNote(Document doc) throws IOException { // document to JSON String json = doc.toJson(); // JSON to note diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java index 0cd07eb..e637f3a 100644 --- a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java @@ -174,7 +174,7 @@ public class OldMongoNotebookRepo implements OldNotebookRepo { /** * Convert document to note */ - private Note documentToNote(Document doc) { + private Note documentToNote(Document doc) throws IOException { // document to JSON String json = doc.toJson(); // JSON to note diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/HeliumRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/HeliumRestApi.java index f3827b8..05b5e77 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/HeliumRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/HeliumRestApi.java @@ -23,6 +23,7 @@ import com.google.gson.reflect.TypeToken; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,7 +120,13 @@ public class HeliumRestApi { @Path("suggest/{noteId}/{paragraphId}") public Response suggest(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) { - Note note = notebook.getNote(noteId); + Note note = null; + try { + note = notebook.getNote(noteId); + } catch (IOException e) { + return new JsonResponse(Response.Status.NOT_FOUND, + "Fail to get note: " + noteId + "\n" + ExceptionUtils.getStackTrace(e)).build(); + } if (note == null) { return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build(); } @@ -142,7 +149,13 @@ public class HeliumRestApi { @Path("load/{noteId}/{paragraphId}") public Response load(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId, String heliumPackage) { - Note note = notebook.getNote(noteId); + Note note = null; + try { + note = notebook.getNote(noteId); + } catch (IOException e) { + return new JsonResponse(Response.Status.NOT_FOUND, + "Fail to get note: " + noteId + "\n" + ExceptionUtils.getStackTrace(e)).build(); + } if (note == null) { return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 5aa776a..49409bc 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -250,6 +250,7 @@ public class NotebookRestApi extends AbstractRestApi { gson.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() { }.getType()); Note note = notebook.getNote(noteId); + checkIfNoteIsNotNull(note); LOG.info("Set permissions {} {} {} {} {} {}", noteId, principal, permMap.get("owners"), permMap.get("readers"), permMap.get("runners"), permMap.get("writers")); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java index b22d6b3..9798f03 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java @@ -51,6 +51,9 @@ public class JobManagerService { throws IOException { List<NoteJobInfo> notesJobInfo = new ArrayList<>(); Note jobNote = notebook.getNote(noteId); + if (jobNote == null) { + callback.onFailure(new IOException("Note " + noteId + " not found"), context); + } notesJobInfo.add(new NoteJobInfo(jobNote)); callback.onSuccess(notesJobInfo, context); return notesJobInfo; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 0567c4b..5fd32b9 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -102,7 +102,7 @@ public class NotebookService { note = notebook.getNote(noteId); if (note != null) { if (!checkPermission(noteId, Permission.READER, Message.OP.GET_HOME_NOTE, context, - callback)) { + callback)) { return null; } } @@ -802,7 +802,7 @@ public class NotebookService { } Note revisionNote = null; if (revisionId.equals("Head")) { - revisionNote = notebook.getNote(noteId); + revisionNote = note; } else { revisionNote = notebook.getNoteByRevision(noteId, note.getPath(), revisionId, context.getAutheInfo()); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 356f7be..21d165c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -658,7 +658,11 @@ public class NotebookServer extends WebSocketServlet if (StringUtils.equals(key, "AuthenticationInfo")) { authenticationInfo = AuthenticationInfo.fromJson(json); } else if (StringUtils.equals(key, "Note")) { - note = Note.fromJson(json); + try { + note = Note.fromJson(json); + } catch (IOException e) { + LOG.warn("Fail to parse note json", e); + } } else if (StringUtils.equals(key, "Paragraph")) { paragraph = Paragraph.fromJson(json); } else if (StringUtils.equals(key, "Set<String>")) { @@ -1591,15 +1595,22 @@ public class NotebookServer extends WebSocketServlet InterpreterResult.Type type, String output) { Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); - Note note = getNotebook().getNote(noteId); - - if (note.isPersonalizedMode()) { - String user = note.getParagraph(paragraphId).getUser(); - if (null != user) { - connectionManager.multicastToUser(user, msg); + try { + Note note = getNotebook().getNote(noteId); + if (note == null) { + LOG.warn("Note " + noteId + " note found"); + return; } - } else { - connectionManager.broadcast(noteId, msg); + if (note.isPersonalizedMode()) { + String user = note.getParagraph(paragraphId).getUser(); + if (null != user) { + connectionManager.multicastToUser(user, msg); + } + } else { + connectionManager.broadcast(noteId, msg); + } + } catch (IOException e) { + LOG.warn("Fail to call onOutputUpdated", e); } } @@ -1608,14 +1619,18 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputClear(String noteId, String paragraphId) { - final Note note = getNotebook().getNote(noteId); - if (note == null) { - // It is possible the note is removed, but the job is still running - LOG.warn("Note {} doesn't existed, it maybe deleted.", noteId); - } else { - note.clearParagraphOutput(paragraphId); - Paragraph paragraph = note.getParagraph(paragraphId); - broadcastParagraph(note, paragraph); + try { + final Note note = getNotebook().getNote(noteId); + if (note == null) { + // It is possible the note is removed, but the job is still running + LOG.warn("Note {} doesn't existed, it maybe deleted.", noteId); + } else { + note.clearParagraphOutput(paragraphId); + Paragraph paragraph = note.getParagraph(paragraphId); + broadcastParagraph(note, paragraph); + } + } catch (IOException e) { + LOG.warn("Fail to call onOutputClear", e); } } @@ -1985,32 +2000,36 @@ public class NotebookServer extends WebSocketServlet @Override public void onParaInfosReceived(String noteId, String paragraphId, String interpreterSettingId, Map<String, String> metaInfos) { - Note note = getNotebook().getNote(noteId); - if (note != null) { - Paragraph paragraph = note.getParagraph(paragraphId); - if (paragraph != null) { - InterpreterSetting setting = getNotebook().getInterpreterSettingManager() - .get(interpreterSettingId); - String label = metaInfos.get("label"); - String tooltip = metaInfos.get("tooltip"); - List<String> keysToRemove = Arrays.asList("noteId", "paraId", "label", "tooltip"); - for (String removeKey : keysToRemove) { - metaInfos.remove(removeKey); - } + try { + Note note = getNotebook().getNote(noteId); + if (note != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + if (paragraph != null) { + InterpreterSetting setting = getNotebook().getInterpreterSettingManager() + .get(interpreterSettingId); + String label = metaInfos.get("label"); + String tooltip = metaInfos.get("tooltip"); + List<String> keysToRemove = Arrays.asList("noteId", "paraId", "label", "tooltip"); + for (String removeKey : keysToRemove) { + metaInfos.remove(removeKey); + } - paragraph - .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); - connectionManager.broadcast( - note.getId(), - new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", - paragraph.getRuntimeInfos())); + paragraph + .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); + connectionManager.broadcast( + note.getId(), + new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", + paragraph.getRuntimeInfos())); + } } + } catch (IOException e) { + LOG.warn("Fail to call onParaInfosReceived", e); } } @Override public List<ParagraphInfo> getParagraphList(String user, String noteId) - throws TException, ServiceException { + throws TException, IOException { Notebook notebook = getNotebook(); Note note = notebook.getNote(noteId); if (null == note) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java index 3ea2f75..9f6a001 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java @@ -20,6 +20,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.types.InterpreterSettingsList; +import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -28,7 +29,7 @@ import java.util.List; */ public class InterpreterBindingUtils { public static List<InterpreterSettingsList> getInterpreterBindings(Notebook notebook, - String noteId) { + String noteId) throws IOException { List<InterpreterSettingsList> settingList = new LinkedList<>(); List<InterpreterSetting> selectedSettings = notebook.getBindedInterpreterSettings(noteId); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java index cf5687c..24a225f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java @@ -26,6 +26,7 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Map; import java.util.Set; @@ -55,7 +56,11 @@ public class ClusterNoteEventListenerTest implements ClusterEventListener { authenticationInfo = AuthenticationInfo.fromJson(json); LOGGER.debug(authenticationInfo.toJson()); } else if (key.equals("Note")) { - note = Note.fromJson(json); + try { + note = Note.fromJson(json); + } catch (IOException e) { + LOGGER.warn("Fail to parse note json", e); + } LOGGER.debug(note.toJson()); } else if (key.equals("Paragraph")) { paragraph = Paragraph.fromJson(json); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java index 0da1c1d..b949d05 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.rest; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -25,7 +24,6 @@ import static org.junit.Assert.assertThat; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.io.IOException; -import java.util.ArrayList; import java.util.Map; import org.apache.commons.httpclient.HttpMethodBase; import org.apache.commons.httpclient.methods.DeleteMethod; diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 99d97cf..bb16b94 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -220,7 +220,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { @Test public void testDeleteNote() throws IOException { LOG.info("testDeleteNote"); - Note note = null; try { //Create note and get ID @@ -229,7 +228,9 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { testDeleteNote(noteId); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + if (TestUtils.getInstance(Notebook.class).getNote(note.getId()) != null) { + TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + } } } } @@ -268,7 +269,6 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { String exportJSON = (String) resp.get("body"); assertNotNull("Can not find new notejson", exportJSON); LOG.info("export JSON:=" + exportJSON); - TestUtils.getInstance(Notebook.class).removeNote(sourceNoteId, anonymous); get.releaseConnection(); } finally { if (null != note) { @@ -280,6 +280,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { @Test public void testImportNotebook() throws IOException { Note note = null; + Note newNote = null; Map<String, Object> resp; String oldJson; String noteName; @@ -301,14 +302,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { oldJson = getNoteContent(sourceNoteId); // delete it first then import it TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); - } finally { - if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); - } - } - Note newNote = null; - try { // call note post PostMethod importPost = httpPost("/notebook/import/", oldJson); assertThat(importPost, isAllowed()); @@ -322,12 +316,17 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { assertEquals("Compare note names", noteName, newNote.getName()); assertEquals("Compare paragraphs count", note.getParagraphs().size(), newNote.getParagraphs() .size()); - // cleanup - TestUtils.getInstance(Notebook.class).removeNote(newNote.getId(), anonymous); importPost.releaseConnection(); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + if (TestUtils.getInstance(Notebook.class).getNote(note.getId()) != null) { + TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + } + } + if (null != newNote) { + if (TestUtils.getInstance(Notebook.class).getNote(newNote.getId()) != null) { + TestUtils.getInstance(Notebook.class).removeNote(newNote.getId(), anonymous); + } } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index bd15bdb..6e12f6c 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -597,7 +597,7 @@ public class NotebookServerTest extends AbstractTestRestApi { } @Test - public void testRuntimeInfos() { + public void testRuntimeInfos() throws IOException { // mock note String msg = "{\"op\":\"IMPORT_NOTE\",\"data\":" + "{\"note\":{\"paragraphs\": [{\"text\": \"Test " + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java index 2251e22..bdd6e83 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java @@ -394,12 +394,18 @@ public class HeliumApplicationFactory implements ApplicationEventListener, NoteE return null; } - Note note = notebook.getNote(noteId); - - if (note == null) { + Note note = null; + try { + note = notebook.getNote(noteId); + if (note == null) { + logger.warn("Note " + noteId + " not found"); + return null; + } + } catch (IOException e) { logger.error("Can't get note {}", noteId); return null; } + Paragraph paragraph = note.getParagraph(paragraphId); if (paragraph == null) { logger.error("Can't get paragraph {}", paragraphId); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 41164ef..42de705 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -360,7 +360,12 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi + ", user = " + user); if (user != null && noteId != null) { - List<ParagraphInfo> paragraphInfos = listener.getParagraphList(user, noteId); + List<ParagraphInfo> paragraphInfos = null; + try { + paragraphInfos = listener.getParagraphList(user, noteId); + } catch (IOException e) { + throw new TException(e); + } return paragraphInfos; } else { LOGGER.error("user or noteId is null!"); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 7584c42..f077204 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -41,5 +41,5 @@ public interface RemoteInterpreterProcessListener { public void onParaInfosReceived(String noteId, String paragraphId, String interpreterSettingId, Map<String, String> metaInfos); - List<ParagraphInfo> getParagraphList(String user, String noteId) throws TException, ServiceException; + List<ParagraphInfo> getParagraphList(String user, String noteId) throws TException, IOException; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java index 91face1..f712aa0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,9 +51,7 @@ public class AuthorizationService implements ClusterEventListener { private ZeppelinConfiguration conf; private Notebook notebook; - /* - * contains roles for each user - */ + // contains roles for each user (username --> roles) private Map<String, Set<String>> userRoles = new HashMap<>(); @Inject @@ -71,105 +70,165 @@ public class AuthorizationService implements ClusterEventListener { return returnUser; } - public void setOwners(String noteId, Set<String> entities) { + public void setOwners(String noteId, Set<String> entities) throws IOException { inlineSetOwners(noteId, entities); broadcastClusterEvent(ClusterEvent.SET_OWNERS_PERMISSIONS, noteId, null, entities); } - private void inlineSetOwners(String noteId, Set<String> entities) { + private void inlineSetOwners(String noteId, Set<String> entities) throws IOException { entities = validateUser(entities); notebook.getNote(noteId).setOwners(entities); } - public void setReaders(String noteId, Set<String> entities) { + public void setReaders(String noteId, Set<String> entities) throws IOException { inlineSetReaders(noteId, entities); broadcastClusterEvent(ClusterEvent.SET_READERS_PERMISSIONS, noteId, null, entities); } - private void inlineSetReaders(String noteId, Set<String> entities) { + private void inlineSetReaders(String noteId, Set<String> entities) throws IOException { entities = validateUser(entities); notebook.getNote(noteId).setReaders(entities); } - public void setRunners(String noteId, Set<String> entities) { + public void setRunners(String noteId, Set<String> entities) throws IOException { inlineSetRunners(noteId, entities); broadcastClusterEvent(ClusterEvent.SET_RUNNERS_PERMISSIONS, noteId, null, entities); } - private void inlineSetRunners(String noteId, Set<String> entities) { + private void inlineSetRunners(String noteId, Set<String> entities) throws IOException { entities = validateUser(entities); notebook.getNote(noteId).setRunners(entities); } - public void setWriters(String noteId, Set<String> entities) { + public void setWriters(String noteId, Set<String> entities) throws IOException { inlineSetWriters(noteId, entities); broadcastClusterEvent(ClusterEvent.SET_WRITERS_PERMISSIONS, noteId, null, entities); } - private void inlineSetWriters(String noteId, Set<String> entities) { + private void inlineSetWriters(String noteId, Set<String> entities) throws IOException { entities = validateUser(entities); notebook.getNote(noteId).setWriters(entities); } public Set<String> getOwners(String noteId) { - Set<String> entities = notebook.getNote(noteId).getOwners(); - if (entities != null) { - return entities; - } else { - return EMPTY_SET ; + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return EMPTY_SET; + } + return note.getOwners(); + } catch (IOException e) { + LOGGER.warn("Fail to getOwner for note: " + noteId, e); + return EMPTY_SET; } } public Set<String> getReaders(String noteId) { - Set<String> entities = notebook.getNote(noteId).getReaders(); - if (entities != null) { - return entities; - } else { - return EMPTY_SET ; + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return EMPTY_SET; + } + return note.getReaders(); + } catch (IOException e) { + LOGGER.warn("Fail to getReaders for note: " + noteId, e); + return EMPTY_SET; } } public Set<String> getRunners(String noteId) { - Set<String> entities = notebook.getNote(noteId).getRunners(); - if (entities != null) { - return entities; - } else { - return EMPTY_SET ; + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return EMPTY_SET; + } + return note.getRunners(); + } catch (IOException e) { + LOGGER.warn("Fail to getRunners for note: " + noteId, e); + return EMPTY_SET; } } public Set<String> getWriters(String noteId) { - Set<String> entities = notebook.getNote(noteId).getWriters(); - if (entities != null) { - return entities; - } else { - return EMPTY_SET ; + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return EMPTY_SET; + } + return note.getWriters(); + } catch (IOException e) { + LOGGER.warn("Fail to getWriters for note: " + noteId, e); + return EMPTY_SET; } } public boolean isOwner(String noteId, Set<String> entities) { - return isMember(entities, notebook.getNote(noteId).getOwners()) || isAdmin(entities); + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return false; + } + return isMember(entities, note.getOwners()) || isAdmin(entities); + } catch (IOException e) { + LOGGER.warn("Fail to check isOwner for note: " + noteId, e); + return false; + } } public boolean isWriter(String noteId, Set<String> entities) { - return isMember(entities, notebook.getNote(noteId).getWriters()) || - isMember(entities, notebook.getNote(noteId).getOwners()) || - isAdmin(entities); + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return false; + } + return isMember(entities, note.getWriters()) || + isMember(entities, note.getOwners()) || + isAdmin(entities); + } catch (IOException e) { + LOGGER.warn("Fail to check isWriter for note: " + noteId, e); + return false; + } } public boolean isReader(String noteId, Set<String> entities) { - return isMember(entities, notebook.getNote(noteId).getReaders()) || - isMember(entities, notebook.getNote(noteId).getOwners()) || - isMember(entities, notebook.getNote(noteId).getWriters()) || - isMember(entities, notebook.getNote(noteId).getRunners()) || - isAdmin(entities); + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return false; + } + return isMember(entities, note.getReaders()) || + isMember(entities, note.getOwners()) || + isMember(entities, note.getWriters()) || + isMember(entities, note.getRunners()) || + isAdmin(entities); + } catch (IOException e) { + LOGGER.warn("Fail to check isReader for note: " + noteId, e); + return false; + } } public boolean isRunner(String noteId, Set<String> entities) { - return isMember(entities, notebook.getNote(noteId).getRunners()) || - isMember(entities, notebook.getNote(noteId).getWriters()) || - isMember(entities, notebook.getNote(noteId).getOwners()) || - isAdmin(entities); + try { + Note note = notebook.getNote(noteId); + if (note == null) { + LOGGER.warn("Note " + noteId + " not found"); + return false; + } + return isMember(entities, note.getRunners()) || + isMember(entities, note.getWriters()) || + isMember(entities, note.getOwners()) || + isAdmin(entities); + } catch (IOException e) { + LOGGER.warn("Fail to check isRunner for note: " + noteId, e); + return false; + } } private boolean isAdmin(Set<String> entities) { @@ -258,25 +317,12 @@ public class AuthorizationService implements ClusterEventListener { return roles; } - public List<NoteInfo> filterByUser(List<NoteInfo> notes, AuthenticationInfo subject) { - final Set<String> entities = Sets.newHashSet(); - if (subject != null) { - entities.add(subject.getUser()); - } - return FluentIterable.from(notes).filter(new Predicate<NoteInfo>() { - @Override - public boolean apply(NoteInfo input) { - return input != null && isReader(input.getId(), entities); - } - }).toList(); - } - - public void clearPermission(String noteId) { + public void clearPermission(String noteId) throws IOException { inlineClearPermission(noteId); broadcastClusterEvent(ClusterEvent.CLEAR_PERMISSION, noteId, null, null); } - public void inlineClearPermission(String noteId) { + public void inlineClearPermission(String noteId) throws IOException { notebook.getNote(noteId).setReaders(Sets.newHashSet()); notebook.getNote(noteId).setRunners(Sets.newHashSet()); notebook.getNote(noteId).setWriters(Sets.newHashSet()); @@ -298,28 +344,32 @@ public class AuthorizationService implements ClusterEventListener { Set<String> set = gson.fromJson(jsonSet, new TypeToken<Set<String>>() { }.getType()); - switch (message.clusterEvent) { - case SET_READERS_PERMISSIONS: - inlineSetReaders(noteId, set); - break; - case SET_WRITERS_PERMISSIONS: - inlineSetWriters(noteId, set); - break; - case SET_OWNERS_PERMISSIONS: - inlineSetOwners(noteId, set); - break; - case SET_RUNNERS_PERMISSIONS: - inlineSetRunners(noteId, set); - break; - case SET_ROLES: - inlineSetRoles(user, set); - break; - case CLEAR_PERMISSION: - inlineClearPermission(noteId); - break; - default: - LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); - break; + try { + switch (message.clusterEvent) { + case SET_READERS_PERMISSIONS: + inlineSetReaders(noteId, set); + break; + case SET_WRITERS_PERMISSIONS: + inlineSetWriters(noteId, set); + break; + case SET_OWNERS_PERMISSIONS: + inlineSetOwners(noteId, set); + break; + case SET_RUNNERS_PERMISSIONS: + inlineSetRunners(noteId, set); + break; + case SET_ROLES: + inlineSetRoles(user, set); + break; + case CLEAR_PERMISSION: + inlineClearPermission(noteId); + break; + default: + LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); + break; + } + } catch (IOException e) { + LOGGER.warn("Fail to broadcast msg", e); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index c1d07e0..b07ed4f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -1029,20 +1029,24 @@ public class Note implements JsonSerializable { return gson.toJson(this); } - public static Note fromJson(String json) { - try - { + /** + * Parse note json from note file. Throw IOException if fail to parse note json. + * + * @param json + * @return Note + * @throws IOException if fail to parse note json (note file may be corrupted) + */ + public static Note fromJson(String json) throws IOException { + try { Note note = gson.fromJson(json, Note.class); note.setCronSupported(ZeppelinConfiguration.create()); convertOldInput(note); note.info.remove("isRunning"); note.postProcessParagraphs(); - return note; } catch (Exception e) { - logger.error("Unable to parse notebook: " + e.toString()); - - return null; + logger.error("Unable to parse note json: " + e.toString()); + throw new IOException("Fail to parse note json: " + json, e); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java index 5ac3633..b64dd4d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java @@ -88,7 +88,7 @@ public class NoteManager { for (String notePath : notesInfo.values()) { try { notes.add(getNoteNode(notePath).getNote()); - } catch (IOException e) { + } catch (Exception e) { LOGGER.warn("Fail to load note: " + notePath, e); } } @@ -266,6 +266,13 @@ public class NoteManager { return notes; } + /** + * Get note from NotebookRepo. + * + * @param noteId + * @return return null if not found on NotebookRepo. + * @throws IOException + */ public Note getNote(String noteId) throws IOException { String notePath = this.notesInfo.get(noteId); if (notePath == null) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 5805160..cdf8150 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -176,11 +176,15 @@ public class Notebook { * @throws IOException, IllegalArgumentException */ public String exportNote(String noteId) throws IOException { - Note note = getNote(noteId); - if (note == null) { + try { + Note note = getNote(noteId); + if (note == null) { + throw new IOException("Note " + noteId + " not found"); + } + return note.toJson(); + } catch (IOException e) { throw new IOException(noteId + " not found"); } - return note.toJson(); } /** @@ -211,7 +215,7 @@ public class Notebook { * @param sourceNoteId - the note ID to clone * @param newNotePath - the path of the new note * @return noteId - * @throws IOException, CloneNotSupportedException, IllegalArgumentException + * @throws IOException */ public Note cloneNote(String sourceNoteId, String newNotePath, AuthenticationInfo subject) throws IOException { @@ -231,31 +235,34 @@ public class Notebook { public void removeNote(String noteId, AuthenticationInfo subject) throws IOException { LOGGER.info("Remove note " + noteId); Note note = getNote(noteId); - if (null != note) { - noteManager.removeNote(noteId, subject); - fireNoteRemoveEvent(note, subject); + if (note == null) { + throw new IOException("Note " + noteId + " not found"); } + noteManager.removeNote(noteId, subject); + fireNoteRemoveEvent(note, subject); } - public Note getNote(String id) { - try { - Note note = noteManager.getNote(id); - if (note == null) { - return null; - } - note.setInterpreterFactory(replFactory); - note.setInterpreterSettingManager(interpreterSettingManager); - note.setParagraphJobListener(paragraphJobListener); - note.setNoteEventListeners(noteEventListeners); - note.setCredentials(credentials); - for (Paragraph p : note.getParagraphs()) { - p.setNote(note); - } - return note; - } catch (IOException e) { - LOGGER.warn("Fail to get note: " + id, e); + /** + * Get note from NotebookRepo and also initialize it with other properties that is not + * persistent in NotebookRepo, such as paragraphJobListener. + * @param noteId + * @return null if note not found. + * @throws IOException when fail to get it from NotebookRepo. + */ + public Note getNote(String noteId) throws IOException { + Note note = noteManager.getNote(noteId); + if (note == null) { return null; } + note.setInterpreterFactory(replFactory); + note.setInterpreterSettingManager(interpreterSettingManager); + note.setParagraphJobListener(paragraphJobListener); + note.setNoteEventListeners(noteEventListeners); + note.setCredentials(credentials); + for (Paragraph p : note.getParagraphs()) { + p.setNote(note); + } + return note; } public void saveNote(Note note, AuthenticationInfo subject) throws IOException { @@ -361,7 +368,8 @@ public class Notebook { try { note = noteManager.getNote(id); } catch (IOException e) { - LOGGER.error("Failed to load " + id, e); + LOGGER.error("Fail to get note: " + id, e); + return null; } if (note == null) { return null; @@ -519,33 +527,30 @@ public class Notebook { } } - - public List<InterpreterSetting> getBindedInterpreterSettings(String noteId) { - Note note = getNote(noteId); - if (note != null) { - Set<InterpreterSetting> settings = new HashSet<>(); - for (Paragraph p : note.getParagraphs()) { - try { - Interpreter intp = p.getBindedInterpreter(); - settings.add(( - (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting()); - } catch (InterpreterNotFoundException e) { - // ignore this - } - } - // add the default interpreter group - InterpreterSetting defaultIntpSetting = - interpreterSettingManager.getByName(note.getDefaultInterpreterGroup()); - if (defaultIntpSetting != null) { - settings.add(defaultIntpSetting); + public List<InterpreterSetting> getBindedInterpreterSettings(String noteId) throws IOException { + Note note = getNote(noteId); + if (note == null) { + return new ArrayList<>(); + } + Set<InterpreterSetting> settings = new HashSet<>(); + for (Paragraph p : note.getParagraphs()) { + try { + Interpreter intp = p.getBindedInterpreter(); + settings.add(( + (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting()); + } catch (InterpreterNotFoundException e) { + // ignore this } - return new ArrayList<>(settings); - } else { - return new LinkedList<>(); } + // add the default interpreter group + InterpreterSetting defaultIntpSetting = + interpreterSettingManager.getByName(note.getDefaultInterpreterGroup()); + if (defaultIntpSetting != null) { + settings.add(defaultIntpSetting); + } + return new ArrayList<>(settings); } - public InterpreterFactory getInterpreterFactory() { return replFactory; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java index 3031a59..20b9e0f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java @@ -287,7 +287,7 @@ public class ZeppelinClient { return getNoteMsg; } - public void handleMsgFromZeppelin(String message, String noteId) { + public void handleMsgFromZeppelin(String message, String noteId) throws IOException { Map<String, String> meta = new HashMap<>(); //TODO(khalid): don't use zeppelinhubToken in this class, decouple meta.put("noteId", noteId); @@ -315,7 +315,7 @@ public class ZeppelinClient { } - private void relayToAllZeppelinHub(ZeppelinhubMessage hubMsg, String noteId) { + private void relayToAllZeppelinHub(ZeppelinhubMessage hubMsg, String noteId) throws IOException { if (StringUtils.isBlank(noteId)) { return; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index 0a7b5da..f4d6ebe 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook.scheduler; +import java.io.IOException; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -40,7 +41,17 @@ public class CronJob implements org.quartz.Job { Notebook notebook = (Notebook) jobDataMap.get("notebook"); String noteId = jobDataMap.getString("noteId"); - Note note = notebook.getNote(noteId); + Note note = null; + try { + note = notebook.getNote(noteId); + if (note == null) { + logger.warn("Note " + noteId + " not found"); + return; + } + } catch (IOException e) { + logger.warn("Fail to get note: " + noteId, e); + return; + } if (note.haveRunningOrPendingParagraphs()) { logger.warn( "execution of the cron job is skipped because there is a running or pending " diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java index b96c6ff..bceeb9c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook.scheduler; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -71,7 +72,13 @@ public class QuartzSchedulerService implements SchedulerService { @Override public void refreshCron(String noteId) { removeCron(noteId); - Note note = notebook.getNote(noteId); + Note note = null; + try { + note = notebook.getNote(noteId); + } catch (IOException e) { + LOGGER.warn("Fail to get note: " + noteId, e); + return; + } if (note == null || note.isTrash()) { return; } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java index ece74ca..0c97726 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -170,7 +171,7 @@ public class NoteTest { assertNotEquals(System.identityHashCode(user1Paragraph), System.identityHashCode(user2Paragraph)); } - public void testNoteJson() { + public void testNoteJson() throws IOException { Note note = new Note("test", "", interpreterFactory, interpreterSettingManager, paragraphJobListener, credentials, noteEventListener); note.setName("/test_note"); note.getConfig().put("config_1", "value_1");