Repository: zeppelin Updated Branches: refs/heads/branch-0.7 11e897dff -> 42385220e
[HOTFIX][ZEPPELIN-2178] Prevent from cleaning output in "Personalized Mode" Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/42385220 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/42385220 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/42385220 Branch: refs/heads/branch-0.7 Commit: 42385220e6d43e7757429f4a10df2a2e6fcadbe4 Parents: 11e897d Author: Jongyoul Lee <jongy...@gmail.com> Authored: Tue Mar 7 14:46:33 2017 +0900 Committer: Jongyoul Lee <jongy...@gmail.com> Committed: Tue Mar 7 14:47:59 2017 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/socket/NotebookServer.java | 34 +++++- .../java/org/apache/zeppelin/notebook/Note.java | 9 ++ .../org/apache/zeppelin/notebook/Paragraph.java | 32 ++++-- .../apache/zeppelin/notebook/ParagraphTest.java | 107 +++++++++++++++++++ 4 files changed, 172 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index e2ffa0a..6791b63 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1127,6 +1127,17 @@ public class NotebookServer extends WebSocketServlet p.setConfig(config); p.setTitle((String) fromMessage.get("title")); p.setText((String) fromMessage.get("paragraph")); + + subject = new AuthenticationInfo(fromMessage.principal); + if (note.isPersonalizedMode()) { + p = p.getUserParagraph(subject.getUser()); + p.settings.setParams(params); + p.setConfig(config); + p.setTitle((String) fromMessage.get("title")); + p.setText((String) fromMessage.get("paragraph")); + } + + note.persist(subject); if (note.isPersonalizedMode()) { @@ -1647,6 +1658,15 @@ public class NotebookServer extends WebSocketServlet p.settings.setParams(params); p.setConfig(config); + if (note.isPersonalizedMode()) { + p = note.getParagraph(paragraphId); + p.setText(text); + p.setTitle(title); + p.setAuthenticationInfo(subject); + p.settings.setParams(params); + p.setConfig(config); + } + return p; } @@ -1767,7 +1787,15 @@ public class NotebookServer extends WebSocketServlet InterpreterResult.Type type, String output) { Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); - broadcast(noteId, msg); + Note note = notebook().getNote(noteId); + if (note.isPersonalizedMode()) { + String user = note.getParagraph(paragraphId).getUser(); + if (null != user) { + multicastToUser(user, msg); + } + } else { + broadcast(noteId, msg); + } } @@ -2036,7 +2064,9 @@ public class NotebookServer extends WebSocketServlet } } if (job instanceof Paragraph) { - notebookServer.broadcastParagraph(note, (Paragraph) job); + Paragraph p = (Paragraph) job; + p.setStatusToUserParagraph(job.getStatus()); + notebookServer.broadcastParagraph(note, p); } try { notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 35f32f3..f341e16 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -137,6 +137,15 @@ public class Note implements Serializable, ParagraphJobListener { valueString = "false"; } getConfig().put("personalizedMode", valueString); + clearUserParagraphs(value); + } + + private void clearUserParagraphs(boolean isPersonalized) { + if (!isPersonalized) { + for (Paragraph p : paragraphs) { + p.clearUserParagraphs(); + } + } } public String getId() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index f609ecb..cb6e0c7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; * Paragraph is a representation of an execution unit. */ public class Paragraph extends Job implements Serializable, Cloneable { + private static final long serialVersionUID = -6328572073497992016L; private static Logger logger = LoggerFactory.getLogger(Paragraph.class); @@ -123,6 +124,9 @@ public class Paragraph extends Job implements Serializable, Cloneable { } public Paragraph getUserParagraph(String user) { + if (!userParagraphMap.containsKey(user)) { + cloneParagraphForUser(user); + } return userParagraphMap.get(user); } @@ -139,12 +143,16 @@ public class Paragraph extends Job implements Serializable, Cloneable { p.setTitle(getTitle()); p.setText(getText()); p.setResult(getReturn()); - p.setStatus(getStatus()); + p.setStatus(Status.READY); p.setId(getId()); addUser(p, user); return p; } + public void clearUserParagraphs() { + userParagraphMap.clear(); + } + public void addUser(Paragraph p, String user) { userParagraphMap.put(user, p); } @@ -370,6 +378,10 @@ public class Paragraph extends Job implements Serializable, Cloneable { } } + for (Paragraph p : userParagraphMap.values()) { + p.setText(getText()); + } + String script = getScriptBody(); // inject form if (repl.getFormType() == FormType.NATIVE) { @@ -401,13 +413,9 @@ public class Paragraph extends Job implements Serializable, Cloneable { List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); resultMessages.addAll(ret.message()); - for (Paragraph p : userParagraphMap.values()) { - p.setText(getText()); - } - InterpreterResult res = new InterpreterResult(ret.code(), resultMessages); - Paragraph p = userParagraphMap.get(getUser()); + Paragraph p = getUserParagraph(getUser()); if (null != p) { p.setResult(res); p.settings.setParams(settings.getParams()); @@ -526,12 +534,12 @@ public class Paragraph extends Job implements Serializable, Cloneable { Credentials credentials = note.getCredentials(); if (authenticationInfo != null) { UserCredentials userCredentials = - credentials.getUserCredentials(authenticationInfo.getUser()); + credentials.getUserCredentials(authenticationInfo.getUser()); authenticationInfo.setUserCredentials(userCredentials); } InterpreterContext interpreterContext = - new InterpreterContext(note.getId(), getId(), getRequiredReplName(), this.getTitle(), + new InterpreterContext(note.getId(), getId(), getRequiredReplName(), this.getTitle(), this.getText(), this.getAuthenticationInfo(), this.getConfig(), this.settings, registry, resourcePool, runners, output); return interpreterContext; @@ -574,7 +582,15 @@ public class Paragraph extends Job implements Serializable, Cloneable { return new ParagraphRunner(note, note.getId(), getId()); } + public void setStatusToUserParagraph(Status status) { + String user = getUser(); + if (null != user) { + getUserParagraph(getUser()).setStatus(status); + } + } + static class ParagraphRunner extends InterpreterContextRunner { + private transient Note note; public ParagraphRunner(Note note, String noteId, String paragraphId) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java index 69577e9..0e77846 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java @@ -19,22 +19,48 @@ package org.apache.zeppelin.notebook; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; +import java.util.List; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectBuilder; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.InterpreterSetting.Status; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.scheduler.JobListener; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.user.Credentials; import org.junit.Test; import java.util.HashMap; import java.util.Map; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; public class ParagraphTest { @Test @@ -125,4 +151,85 @@ public class ParagraphTest { verify(registry).get("age", noteId, null); assertEquals(actual, expected); } + + @Test + public void returnDefaultParagraphWithNewUser() { + Paragraph p = new Paragraph("para_1", null, null, null, null); + Object defaultValue = "Default Value"; + p.setResult(defaultValue); + Paragraph newUserParagraph = p.getUserParagraph("new_user"); + assertNotNull(newUserParagraph); + assertEquals(defaultValue, newUserParagraph.getReturn()); + } + + @Test + public void returnUnchangedResultsWithDifferentUser() throws Throwable { + InterpreterSettingManager mockInterpreterSettingManager = mock(InterpreterSettingManager.class); + Note mockNote = mock(Note.class); + when(mockNote.getCredentials()).thenReturn(mock(Credentials.class)); + Paragraph spyParagraph = spy(new Paragraph("para_1", mockNote, null, null, mockInterpreterSettingManager)); + + doReturn("spy").when(spyParagraph).getRequiredReplName(); + + + Interpreter mockInterpreter = mock(Interpreter.class); + doReturn(mockInterpreter).when(spyParagraph).getRepl(anyString()); + + InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class); + when(mockInterpreter.getInterpreterGroup()).thenReturn(mockInterpreterGroup); + when(mockInterpreterGroup.getId()).thenReturn("mock_id_1"); + when(mockInterpreterGroup.getAngularObjectRegistry()).thenReturn(mock(AngularObjectRegistry.class)); + when(mockInterpreterGroup.getResourcePool()).thenReturn(mock(ResourcePool.class)); + + List<InterpreterSetting> spyInterpreterSettingList = spy(Lists.<InterpreterSetting>newArrayList()); + InterpreterSetting mockInterpreterSetting = mock(InterpreterSetting.class); + InterpreterOption mockInterpreterOption = mock(InterpreterOption.class); + when(mockInterpreterSetting.getOption()).thenReturn(mockInterpreterOption); + when(mockInterpreterOption.permissionIsSet()).thenReturn(false); + when(mockInterpreterSetting.getStatus()).thenReturn(Status.READY); + when(mockInterpreterSetting.getId()).thenReturn("mock_id_1"); + when(mockInterpreterSetting.getInterpreterGroup(anyString(), anyString())).thenReturn(mockInterpreterGroup); + spyInterpreterSettingList.add(mockInterpreterSetting); + when(mockNote.getId()).thenReturn("any_id"); + when(mockInterpreterSettingManager.getInterpreterSettings(anyString())).thenReturn(spyInterpreterSettingList); + + doReturn("spy script body").when(spyParagraph).getScriptBody(); + + when(mockInterpreter.getFormType()).thenReturn(FormType.NONE); + + ParagraphJobListener mockJobListener = mock(ParagraphJobListener.class); + doReturn(mockJobListener).when(spyParagraph).getListener(); + doNothing().when(mockJobListener).onOutputUpdateAll(Mockito.<Paragraph>any(), Mockito.anyList()); + + InterpreterResult mockInterpreterResult = mock(InterpreterResult.class); + when(mockInterpreter.interpret(anyString(), Mockito.<InterpreterContext>any())).thenReturn(mockInterpreterResult); + when(mockInterpreterResult.code()).thenReturn(Code.SUCCESS); + + + // Actual test + List<InterpreterResultMessage> result1 = Lists.newArrayList(); + result1.add(new InterpreterResultMessage(Type.TEXT, "result1")); + when(mockInterpreterResult.message()).thenReturn(result1); + + AuthenticationInfo user1 = new AuthenticationInfo("user1"); + spyParagraph.setAuthenticationInfo(user1); + spyParagraph.jobRun(); + Paragraph p1 = spyParagraph.getUserParagraph(user1.getUser()); + + List<InterpreterResultMessage> result2 = Lists.newArrayList(); + result2.add(new InterpreterResultMessage(Type.TEXT, "result2")); + when(mockInterpreterResult.message()).thenReturn(result2); + + AuthenticationInfo user2 = new AuthenticationInfo("user2"); + spyParagraph.setAuthenticationInfo(user2); + spyParagraph.jobRun(); + Paragraph p2 = spyParagraph.getUserParagraph(user2.getUser()); + + assertNotEquals(p1.getReturn().toString(), p2.getReturn().toString()); + + assertEquals(p1, spyParagraph.getUserParagraph(user1.getUser())); + + + + } }