This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 211268d [ZEPPELIN-4655]. authorizationService in ConnectionManager is null 211268d is described below commit 211268d61a2eaee6a896c2b83af3b3266308940a Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sun Mar 1 14:58:50 2020 +0800 [ZEPPELIN-4655]. authorizationService in ConnectionManager is null ### What is this PR for? A few sentences describing the overall goals of the pull request's commits. First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html ### What type of PR is it? [Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/ * Put link here, and add [ZEPPELIN-*Jira number*] in PR title, eg. [ZEPPELIN-533] ### How should this be tested? * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? * Is there breaking changes for older versions? * Does this needs documentation? Author: Jeff Zhang <zjf...@apache.org> Closes #3670 from zjffdu/ZEPPELIN-4655 and squashes the following commits: 51803d742 [Jeff Zhang] [ZEPPELIN-4655]. authorizationService in ConnectionManager is null --- .../org/apache/zeppelin/server/ZeppelinServer.java | 4 +- .../apache/zeppelin/socket/ConnectionManager.java | 5 +- .../org/apache/zeppelin/socket/NotebookServer.java | 148 ++--- .../apache/zeppelin/cluster/ClusterEventTest.java | 2 +- .../apache/zeppelin/socket/NotebookServerTest.java | 615 +++++++++++---------- 5 files changed, 400 insertions(+), 374 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 0e5be41..0c9b1cf 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -65,6 +65,7 @@ import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.service.*; import org.apache.zeppelin.service.AuthenticationService; +import org.apache.zeppelin.socket.ConnectionManager; import org.apache.zeppelin.socket.NotebookServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -161,7 +162,8 @@ public class ZeppelinServer extends ResourceConfig { bindAsContract(GsonProvider.class).in(Singleton.class); bindAsContract(WebApplicationExceptionMapper.class).in(Singleton.class); bindAsContract(AdminService.class).in(Singleton.class); - bindAsContract(AuthorizationService.class).to(Singleton.class); + bindAsContract(AuthorizationService.class).in(Singleton.class); + bindAsContract(ConnectionManager.class).in(Singleton.class); // TODO(jl): Will make it more beautiful if (!StringUtils.isBlank(conf.getShiroPath())) { bind(ShiroAuthenticationService.class).to(AuthenticationService.class).in(Singleton.class); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index 0df4db4..489a59f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.websocket.api.WebSocketException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -87,8 +88,8 @@ public class ConnectionManager { private AuthorizationService authorizationService; - public void setAuthorizationService( - AuthorizationService authorizationService) { + @Inject + public ConnectionManager(AuthorizationService authorizationService) { this.authorizationService = authorizationService; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 01c463d..b177f64 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -139,8 +139,6 @@ public class NotebookServer extends WebSocketServlet .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); private static AtomicReference<NotebookServer> self = new AtomicReference<>(); - private ConnectionManager connectionManager; - private ExecutorService executorService = Executors.newFixedThreadPool(10); private Provider<Notebook> notebookProvider; @@ -148,9 +146,9 @@ public class NotebookServer extends WebSocketServlet private Provider<AuthorizationService> authorizationServiceProvider; private Provider<ConfigurationService> configurationServiceProvider; private Provider<JobManagerService> jobManagerServiceProvider; + private Provider<ConnectionManager> connectionManagerProvider; public NotebookServer() { - this.connectionManager = new ConnectionManager(); NotebookServer.self.set(this); LOG.info("NotebookServer instantiated: {}", this); } @@ -181,6 +179,12 @@ public class NotebookServer extends WebSocketServlet } @Inject + public void setConnectionManagerProvider(Provider<ConnectionManager> connectionManagerProvider) { + this.connectionManagerProvider = connectionManagerProvider; + LOG.info("Injected ConnectionManagerProvider"); + } + + @Inject public void setConfigurationService( Provider<ConfigurationService> configurationServiceProvider) { this.configurationServiceProvider = configurationServiceProvider; @@ -200,6 +204,10 @@ public class NotebookServer extends WebSocketServlet return notebookProvider.get(); } + public ConnectionManager getConnectionManager() { + return connectionManagerProvider.get(); + } + public NotebookService getNotebookService() { return notebookServiceProvider.get(); } @@ -233,7 +241,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onOpen(NotebookSocket conn) { LOG.info("New connection from {}", conn); - connectionManager.addConnection(conn); + getConnectionManager().addConnection(conn); } @Override @@ -284,7 +292,7 @@ public class NotebookServer extends WebSocketServlet } if (StringUtils.isEmpty(conn.getUser())) { - connectionManager.addUserConnection(messagereceived.principal, conn); + getConnectionManager().addUserConnection(messagereceived.principal, conn); } // Lets be elegant here @@ -433,7 +441,7 @@ public class NotebookServer extends WebSocketServlet getInterpreterSettings(conn, messagereceived); break; case WATCHER: - connectionManager.switchConnectionToWatcher(conn); + getConnectionManager().switchConnectionToWatcher(conn); break; case SAVE_NOTE_FORMS: saveNoteForms(conn, messagereceived); @@ -460,13 +468,9 @@ public class NotebookServer extends WebSocketServlet @Override public void onClose(NotebookSocket conn, int code, String reason) { LOG.info("Closed connection to {} ({}) {}", conn, code, reason); - connectionManager.removeConnection(conn); - connectionManager.removeConnectionFromAllNote(conn); - connectionManager.removeUserConnection(conn.getUser(), conn); - } - - public ConnectionManager getConnectionManager() { - return connectionManager; + getConnectionManager().removeConnection(conn); + getConnectionManager().removeConnectionFromAllNote(conn); + getConnectionManager().removeUserConnection(conn.getUser(), conn); } protected Message deserializeMessage(String msg) { @@ -478,12 +482,12 @@ public class NotebookServer extends WebSocketServlet } public void broadcast(Message m) { - connectionManager.broadcast(m); + getConnectionManager().broadcast(m); } public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { - connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); + getConnectionManager().addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage), new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) { @Override @@ -513,7 +517,7 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notesJobInfo); - connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + getConnectionManager().broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); } @@ -525,7 +529,7 @@ public class NotebookServer extends WebSocketServlet } public void unsubscribeNoteJobInfo(NotebookSocket conn) { - connectionManager.removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); + getConnectionManager().removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); } public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException { @@ -578,7 +582,7 @@ public class NotebookServer extends WebSocketServlet private void inlineBroadcastNote(Note note) { Message message = new Message(OP.NOTE).put("note", note); - connectionManager.broadcast(note.getId(), message); + getConnectionManager().broadcast(note.getId(), message); } private void inlineBroadcastParagraph(Note note, Paragraph p) { @@ -588,7 +592,7 @@ public class NotebookServer extends WebSocketServlet broadcastParagraphs(p.getUserParagraphMap(), p); } else { Message message = new Message(OP.PARAGRAPH).put("paragraph", p); - connectionManager.broadcast(note.getId(), message); + getConnectionManager().broadcast(note.getId(), message); } } @@ -602,7 +606,7 @@ public class NotebookServer extends WebSocketServlet if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)); - connectionManager.multicastToUser(user, message); + getConnectionManager().multicastToUser(user, message); } } } @@ -618,7 +622,7 @@ public class NotebookServer extends WebSocketServlet int paraIndex = note.getParagraphs().indexOf(para); Message message = new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex); - connectionManager.broadcast(note.getId(), message); + getConnectionManager().broadcast(note.getId(), message); } private void broadcastNewParagraph(Note note, Paragraph para) { @@ -634,9 +638,9 @@ public class NotebookServer extends WebSocketServlet List<NoteInfo> notesInfo = getNotebook().getNotesInfo( noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles)); Message message = new Message(OP.NOTES_INFO).put("notes", notesInfo); - connectionManager.multicastToUser(subject.getUser(), message); + getConnectionManager().multicastToUser(subject.getUser(), message); //to others afterwards - connectionManager.broadcastNoteListExcept(notesInfo, subject); + getConnectionManager().broadcastNoteListExcept(notesInfo, subject); } public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { @@ -754,7 +758,7 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(List<NoteInfo> notesInfo, ServiceContext context) throws IOException { super.onSuccess(notesInfo, context); - connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); + getConnectionManager().unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); } }); } @@ -767,10 +771,10 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(List<NoteInfo> notesInfo, ServiceContext context) throws IOException { super.onSuccess(notesInfo, context); - connectionManager.multicastToUser(context.getAutheInfo().getUser(), + getConnectionManager().multicastToUser(context.getAutheInfo().getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); //to others afterwards - connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo()); + getConnectionManager().broadcastNoteListExcept(notesInfo, context.getAutheInfo()); } }); } @@ -813,7 +817,7 @@ public class NotebookServer extends WebSocketServlet new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note note, ServiceContext context) throws IOException { - connectionManager.addNoteConnection(note.getId(), conn); + getConnectionManager().addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); updateAngularObjectRegistry(conn, note); sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); @@ -858,11 +862,11 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(Note note, ServiceContext context) throws IOException { super.onSuccess(note, context); if (note != null) { - connectionManager.addNoteConnection(note.getId(), conn); + getConnectionManager().addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); } else { - connectionManager.removeConnectionFromAllNote(conn); + getConnectionManager().removeConnectionFromAllNote(conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); } } @@ -885,7 +889,7 @@ public class NotebookServer extends WebSocketServlet new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note note, ServiceContext context) throws IOException { - connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) + getConnectionManager().broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) .put("config", config) .put("info", note.getInfo())); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); @@ -904,7 +908,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(Note note, ServiceContext context) throws IOException { super.onSuccess(note, context); - connectionManager.broadcastNote(note); + getConnectionManager().broadcastNote(note); } }); } @@ -956,7 +960,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(Note note, ServiceContext context) throws IOException { super.onSuccess(note, context); - connectionManager.addNoteConnection(note.getId(), conn); + getConnectionManager().addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } @@ -978,7 +982,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(String message, ServiceContext context) throws IOException { super.onSuccess(message, context); - connectionManager.removeNoteConnection(noteId); + getConnectionManager().removeNoteConnection(noteId); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } }); @@ -996,7 +1000,7 @@ public class NotebookServer extends WebSocketServlet ServiceContext context) throws IOException { super.onSuccess(notesInfo, context); for (NoteInfo noteInfo : notesInfo) { - connectionManager.removeNoteConnection(noteInfo.getId()); + getConnectionManager().removeNoteConnection(noteInfo.getId()); } broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } @@ -1089,7 +1093,7 @@ public class NotebookServer extends WebSocketServlet private void updateParagraph(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); if (noteId == null) { noteId = (String) fromMessage.get("noteId"); } @@ -1125,7 +1129,7 @@ public class NotebookServer extends WebSocketServlet return; } - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); if (noteId == null) { noteId = fromMessage.getType("noteId", LOG); if (noteId == null) { @@ -1146,21 +1150,21 @@ public class NotebookServer extends WebSocketServlet super.onSuccess(result, context); Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", result) .put("paragraphId", paragraphId); - connectionManager.broadcastExcept(noteId2, message, conn); + getConnectionManager().broadcastExcept(noteId2, message, conn); } }); } private void cloneNote(NotebookSocket conn, Message fromMessage) throws IOException { - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); String name = (String) fromMessage.get("name"); getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage), new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note newNote, ServiceContext context) throws IOException { super.onSuccess(newNote, context); - connectionManager.addNoteConnection(newNote.getId(), conn); + getConnectionManager().addNoteConnection(newNote.getId(), conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } @@ -1225,13 +1229,13 @@ public class NotebookServer extends WebSocketServlet private void removeParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().removeParagraph(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); - connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). + getConnectionManager().broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). put("id", p.getId())); } }); @@ -1240,14 +1244,14 @@ public class NotebookServer extends WebSocketServlet private void clearParagraphOutput(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); if (p.getNote().isPersonalizedMode()) { - connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); + getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); } else { broadcastParagraph(p.getNote(), p); } @@ -1257,7 +1261,7 @@ public class NotebookServer extends WebSocketServlet private void completion(NotebookSocket conn, Message fromMessage) throws IOException { - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -1305,7 +1309,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(AngularObject ao, ServiceContext context) throws IOException { super.onSuccess(ao, context); - connectionManager.broadcastExcept(noteId, + getConnectionManager().broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", ao.getParagraphId()), conn); @@ -1387,7 +1391,7 @@ public class NotebookServer extends WebSocketServlet final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); - connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE) + getConnectionManager().broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE) .put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); @@ -1401,7 +1405,7 @@ public class NotebookServer extends WebSocketServlet NotebookSocket conn) { final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); - connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE) + getConnectionManager().broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE) .put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); @@ -1413,14 +1417,14 @@ public class NotebookServer extends WebSocketServlet Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().moveParagraph(noteId, paragraphId, newIndex, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph result, ServiceContext context) throws IOException { super.onSuccess(result, context); - connectionManager.broadcast(result.getNote().getId(), + getConnectionManager().broadcast(result.getNote().getId(), new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); } }); @@ -1429,7 +1433,7 @@ public class NotebookServer extends WebSocketServlet private String insertParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); Map<String, Object> config; if (fromMessage.get("config") != null) { config = (Map<String, Object>) fromMessage.get("config"); @@ -1464,7 +1468,7 @@ public class NotebookServer extends WebSocketServlet private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<>(conn)); } @@ -1484,14 +1488,14 @@ public class NotebookServer extends WebSocketServlet private void broadcastSpellExecution(NotebookSocket conn, Message fromMessage) throws IOException { - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().spell(noteId, fromMessage, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); // broadcast to other clients only - connectionManager.broadcastExcept(p.getNote().getId(), + getConnectionManager().broadcastExcept(p.getNote().getId(), new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn); } }); @@ -1500,7 +1504,7 @@ public class NotebookServer extends WebSocketServlet private void runParagraph(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); String text = (String) fromMessage.get("paragraph"); String title = (String) fromMessage.get("title"); Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); @@ -1514,7 +1518,7 @@ public class NotebookServer extends WebSocketServlet if (p.getNote().isPersonalizedMode()) { Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId, context.getAutheInfo().getUser()); - connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); + getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); } // if it's the last paragraph and not empty, let's add a new one @@ -1645,7 +1649,7 @@ public class NotebookServer extends WebSocketServlet public void onOutputAppend(String noteId, String paragraphId, int index, String output) { Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("data", output); - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } /** @@ -1669,10 +1673,10 @@ public class NotebookServer extends WebSocketServlet if (note.isPersonalizedMode()) { String user = note.getParagraph(paragraphId).getUser(); if (null != user) { - connectionManager.multicastToUser(user, msg); + getConnectionManager().multicastToUser(user, msg); } } else { - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } } catch (IOException e) { LOG.warn("Fail to call onOutputUpdated", e); @@ -1708,7 +1712,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("appId", appId).put("data", output); - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } /** @@ -1720,14 +1724,14 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("type", type).put("appId", appId).put("data", output); - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } @Override public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) { Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId) .put("appId", appId).put("pkg", pkg); - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } @Override @@ -1735,7 +1739,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId) .put("appId", appId).put("status", status); - connectionManager.broadcast(noteId, msg); + getConnectionManager().broadcast(noteId, msg); } @@ -1869,7 +1873,7 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notesJobInfo); - connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + getConnectionManager().broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); } } @@ -1877,7 +1881,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onProgressUpdate(Paragraph p, int progress) { - connectionManager.broadcast(p.getNote().getId(), + getConnectionManager().broadcast(p.getNote().getId(), new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress)); } @@ -1930,7 +1934,7 @@ public class NotebookServer extends WebSocketServlet @Override public void noteRunningStatusChange(String noteId, boolean newStatus) { - connectionManager.broadcast( + getConnectionManager().broadcast( noteId, new Message(OP.NOTE_RUNNING_STATUS ).put("status", newStatus)); @@ -1984,7 +1988,7 @@ public class NotebookServer extends WebSocketServlet continue; } - connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE) + getConnectionManager().broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE) .put("angularObject", object) .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) .put("paragraphId", object.getParagraphId())); @@ -2003,7 +2007,7 @@ public class NotebookServer extends WebSocketServlet getNotebook().getInterpreterSettingManager().getSettingIds(); for (String id : settingIds) { if (interpreterGroupId.contains(id)) { - connectionManager.broadcast(note.getId(), + getConnectionManager().broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId) .put("paragraphId", paragraphId)); break; @@ -2015,7 +2019,7 @@ public class NotebookServer extends WebSocketServlet private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("paragraphId"); String magic = (String) fromMessage.get("magic"); - String noteId = connectionManager.getAssociatedNoteId(conn); + String noteId = getConnectionManager().getAssociatedNoteId(conn); getNotebookService().getEditorSetting(noteId, magic, getServiceContext(fromMessage), @@ -2070,7 +2074,7 @@ public class NotebookServer extends WebSocketServlet paragraph .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); - connectionManager.broadcast( + getConnectionManager().broadcast( note.getId(), new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", paragraph.getRuntimeInfos())); @@ -2121,7 +2125,7 @@ public class NotebookServer extends WebSocketServlet GUI formsSettings = new GUI(); formsSettings.setForms(note.getNoteForms()); formsSettings.setParams(note.getNoteParams()); - connectionManager.broadcast(note.getId(), + getConnectionManager().broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings)); } @@ -2155,14 +2159,14 @@ public class NotebookServer extends WebSocketServlet @ManagedAttribute public Set<String> getConnectedUsers() { - return connectionManager.getConnectedUsers(); + return getConnectionManager().getConnectedUsers(); } @ManagedOperation public void sendMessage(String message) { Message m = new Message(OP.NOTICE); m.data.put("notice", message); - connectionManager.broadcast(m); + getConnectionManager().broadcast(m); } private ServiceContext getServiceContext(Message message) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java index 9514def..912ed39 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java @@ -100,7 +100,7 @@ public class ClusterEventTest extends ZeppelinServerMock { ZeppelinServerMock.startUp("ClusterEventTest", zconf); notebook = TestUtils.getInstance(Notebook.class); - authorizationService = new AuthorizationService(notebook, zconf); + authorizationService = TestUtils.getInstance(AuthorizationService.class); schedulerService = new QuartzSchedulerService(zconf, notebook); notebookServer = spy(NotebookServer.getInstance()); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index f035c6c..185de1f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -79,7 +79,6 @@ import org.junit.Test; public class NotebookServerTest extends AbstractTestRestApi { private static Notebook notebook; private static NotebookServer notebookServer; - private static SchedulerService schedulerService; private static NotebookService notebookService; private static AuthorizationService authorizationService; private HttpServletRequest mockRequest; @@ -89,17 +88,9 @@ public class NotebookServerTest extends AbstractTestRestApi { public static void init() throws Exception { AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName()); notebook = TestUtils.getInstance(Notebook.class); - authorizationService = new AuthorizationService(notebook, notebook.getConf()); - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - schedulerService = new QuartzSchedulerService(conf, notebook); - notebookServer = spy(NotebookServer.getInstance()); - notebookService = - new NotebookService( - notebook, authorizationService, conf, schedulerService); - - ConfigurationService configurationService = new ConfigurationService(notebook.getConf()); - when(notebookServer.getNotebookService()).thenReturn(notebookService); - when(notebookServer.getConfigurationService()).thenReturn(configurationService); + authorizationService = TestUtils.getInstance(AuthorizationService.class); + notebookServer = TestUtils.getInstance(NotebookServer.class); + notebookService = TestUtils.getInstance(NotebookService.class); } @AfterClass @@ -110,26 +101,19 @@ public class NotebookServerTest extends AbstractTestRestApi { @Before public void setUp() { mockRequest = mock(HttpServletRequest.class); - anonymous = new AuthenticationInfo("anonymous"); + anonymous = AuthenticationInfo.ANONYMOUS; } @Test public void checkOrigin() throws UnknownHostException { - NotebookServer server = new NotebookServer(); - server.setNotebook(() -> notebook); - server.setNotebookService(() -> notebookService); String origin = "http://" + InetAddress.getLocalHost().getHostName() + ":8080"; - assertTrue("Origin " + origin + " is not allowed. Please check your hostname.", - server.checkOrigin(mockRequest, origin)); + notebookServer.checkOrigin(mockRequest, origin)); } @Test public void checkInvalidOrigin(){ - NotebookServer server = new NotebookServer(); - server.setNotebook(() -> notebook); - server.setNotebookService(() -> notebookService); - assertFalse(server.checkOrigin(mockRequest, "http://evillocalhost:8080")); + assertFalse(notebookServer.checkOrigin(mockRequest, "http://evillocalhost:8080")); } @Test @@ -201,226 +185,241 @@ public class NotebookServerTest extends AbstractTestRestApi { @Test public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException, InterruptedException { - // create a notebook - Note note1 = notebook.createNote("note1", anonymous); - - // get reference to interpreterGroup - InterpreterGroup interpreterGroup = null; - List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get(); - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("md")) { - interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); - break; + Note note1 = null; + try { + // create a notebook + note1 = notebook.createNote("note1", anonymous); + + // get reference to interpreterGroup + InterpreterGroup interpreterGroup = null; + List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get(); + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("md")) { + interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); + break; + } } - } - // start interpreter process - Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%md start remote interpreter process"); - p1.setAuthenticationInfo(anonymous); - note1.run(p1.getId()); - - // wait for paragraph finished - while (true) { - if (p1.getStatus() == Job.Status.FINISHED) { - break; + // start interpreter process + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%md start remote interpreter process"); + p1.setAuthenticationInfo(anonymous); + note1.run(p1.getId()); + + // wait for paragraph finished + while (true) { + if (p1.getStatus() == Job.Status.FINISHED) { + break; + } + Thread.sleep(100); } - Thread.sleep(100); - } - // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 - Thread.sleep(1000); + // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 + Thread.sleep(1000); - // add angularObject - interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); + // add angularObject + interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); - // create two sockets and open it - NotebookSocket sock1 = createWebSocket(); - NotebookSocket sock2 = createWebSocket(); - - assertEquals(sock1, sock1); - assertNotEquals(sock1, sock2); + // create two sockets and open it + NotebookSocket sock1 = createWebSocket(); + NotebookSocket sock2 = createWebSocket(); - notebookServer.onOpen(sock1); - notebookServer.onOpen(sock2); - verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject - // open the same notebook from sockets - notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); - notebookServer.onMessage(sock2, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); + assertEquals(sock1, sock1); + assertNotEquals(sock1, sock2); - reset(sock1); - reset(sock2); + notebookServer.onOpen(sock1); + notebookServer.onOpen(sock2); + verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject + // open the same notebook from sockets + notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); + notebookServer.onMessage(sock2, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); - // update object from sock1 - notebookServer.onMessage(sock1, - new Message(OP.ANGULAR_OBJECT_UPDATED) - .put("noteId", note1.getId()) - .put("name", "object1") - .put("value", "value1") - .put("interpreterGroupId", interpreterGroup.getId()).toJson()); + reset(sock1); + reset(sock2); + // update object from sock1 + notebookServer.onMessage(sock1, + new Message(OP.ANGULAR_OBJECT_UPDATED) + .put("noteId", note1.getId()) + .put("name", "object1") + .put("value", "value1") + .put("interpreterGroupId", interpreterGroup.getId()).toJson()); - // expect object is broadcasted except for where the update is created - verify(sock1, times(0)).send(anyString()); - verify(sock2, times(1)).send(anyString()); - notebook.removeNote(note1.getId(), anonymous); + // expect object is broadcasted except for where the update is created + verify(sock1, times(0)).send(anyString()); + verify(sock2, times(1)).send(anyString()); + } finally { + if (note1 != null) { + notebook.removeNote(note1.getId(), anonymous); + } + } } @Test public void testAngularObjectSaveToNote() throws IOException, InterruptedException { // create a notebook - Note note1 = notebook.createNote("note1", "angular", anonymous); - - // get reference to interpreterGroup - InterpreterGroup interpreterGroup = null; - List<InterpreterSetting> settings = note1.getBindedInterpreterSettings(new ArrayList<>()); - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("angular")) { - interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); - break; + Note note1 = null; + try { + note1 = notebook.createNote("note1", "angular", anonymous); + + // get reference to interpreterGroup + InterpreterGroup interpreterGroup = null; + List<InterpreterSetting> settings = note1.getBindedInterpreterSettings(new ArrayList<>()); + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("angular")) { + interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); + break; + } } - } - // start interpreter process - Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>"); - p1.setAuthenticationInfo(anonymous); - note1.run(p1.getId()); - - // wait for paragraph finished - while (true) { - if (p1.getStatus() == Job.Status.FINISHED) { - break; + // start interpreter process + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>"); + p1.setAuthenticationInfo(anonymous); + note1.run(p1.getId()); + + // wait for paragraph finished + while (true) { + if (p1.getStatus() == Job.Status.FINISHED) { + break; + } + Thread.sleep(100); + } + // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 + Thread.sleep(1000); + + // create two sockets and open it + NotebookSocket sock1 = createWebSocket(); + + notebookServer.onOpen(sock1); + verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject + // open the same notebook from sockets + notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); + + reset(sock1); + + // bind object from sock1 + notebookServer.onMessage(sock1, + new Message(OP.ANGULAR_OBJECT_CLIENT_BIND) + .put("noteId", note1.getId()) + .put("paragraphId", p1.getId()) + .put("name", "COMMAND_TYPE") + .put("value", "COMMAND_TYPE_VALUE") + .put("interpreterGroupId", interpreterGroup.getId()).toJson()); + List<AngularObject> list = note1.getAngularObjects("angular-shared_process"); + assertEquals(list.size(), 1); + assertEquals(list.get(0).getNoteId(), note1.getId()); + assertEquals(list.get(0).getParagraphId(), p1.getId()); + assertEquals(list.get(0).getName(), "COMMAND_TYPE"); + assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE"); + // Check if the interpreterGroup AngularObjectRegistry is updated + Map<String, Map<String, AngularObject>> mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); + AngularObject ao = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE"); + assertEquals(ao.getName(), "COMMAND_TYPE"); + assertEquals(ao.get(), "COMMAND_TYPE_VALUE"); + + // update bind object from sock1 + notebookServer.onMessage(sock1, + new Message(OP.ANGULAR_OBJECT_UPDATED) + .put("noteId", note1.getId()) + .put("paragraphId", p1.getId()) + .put("name", "COMMAND_TYPE") + .put("value", "COMMAND_TYPE_VALUE_UPDATE") + .put("interpreterGroupId", interpreterGroup.getId()).toJson()); + list = note1.getAngularObjects("angular-shared_process"); + assertEquals(list.size(), 1); + assertEquals(list.get(0).getNoteId(), note1.getId()); + assertEquals(list.get(0).getParagraphId(), p1.getId()); + assertEquals(list.get(0).getName(), "COMMAND_TYPE"); + assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE_UPDATE"); + // Check if the interpreterGroup AngularObjectRegistry is updated + mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); + AngularObject ao1 = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE"); + assertEquals(ao1.getName(), "COMMAND_TYPE"); + assertEquals(ao1.get(), "COMMAND_TYPE_VALUE_UPDATE"); + + // unbind object from sock1 + notebookServer.onMessage(sock1, + new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND) + .put("noteId", note1.getId()) + .put("paragraphId", p1.getId()) + .put("name", "COMMAND_TYPE") + .put("value", "COMMAND_TYPE_VALUE") + .put("interpreterGroupId", interpreterGroup.getId()).toJson()); + list = note1.getAngularObjects("angular-shared_process"); + assertEquals(list.size(), 0); + // Check if the interpreterGroup AngularObjectRegistry is delete + mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); + AngularObject ao2 = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE"); + assertNull(ao2); + } finally { + if (note1 != null) { + notebook.removeNote(note1.getId(), anonymous); } - Thread.sleep(100); } - // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 - Thread.sleep(1000); - - // create two sockets and open it - NotebookSocket sock1 = createWebSocket(); - - notebookServer.onOpen(sock1); - verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject - // open the same notebook from sockets - notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); - - reset(sock1); - - // bind object from sock1 - notebookServer.onMessage(sock1, - new Message(OP.ANGULAR_OBJECT_CLIENT_BIND) - .put("noteId", note1.getId()) - .put("paragraphId", p1.getId()) - .put("name", "COMMAND_TYPE") - .put("value", "COMMAND_TYPE_VALUE") - .put("interpreterGroupId", interpreterGroup.getId()).toJson()); - List<AngularObject> list = note1.getAngularObjects("angular-shared_process"); - assertEquals(list.size(), 1); - assertEquals(list.get(0).getNoteId(), note1.getId()); - assertEquals(list.get(0).getParagraphId(), p1.getId()); - assertEquals(list.get(0).getName(), "COMMAND_TYPE"); - assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE"); - // Check if the interpreterGroup AngularObjectRegistry is updated - Map<String, Map<String, AngularObject>> mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); - AngularObject ao = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE"); - assertEquals(ao.getName(), "COMMAND_TYPE"); - assertEquals(ao.get(), "COMMAND_TYPE_VALUE"); - - // update bind object from sock1 - notebookServer.onMessage(sock1, - new Message(OP.ANGULAR_OBJECT_UPDATED) - .put("noteId", note1.getId()) - .put("paragraphId", p1.getId()) - .put("name", "COMMAND_TYPE") - .put("value", "COMMAND_TYPE_VALUE_UPDATE") - .put("interpreterGroupId", interpreterGroup.getId()).toJson()); - list = note1.getAngularObjects("angular-shared_process"); - assertEquals(list.size(), 1); - assertEquals(list.get(0).getNoteId(), note1.getId()); - assertEquals(list.get(0).getParagraphId(), p1.getId()); - assertEquals(list.get(0).getName(), "COMMAND_TYPE"); - assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE_UPDATE"); - // Check if the interpreterGroup AngularObjectRegistry is updated - mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); - AngularObject ao1 = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE"); - assertEquals(ao1.getName(), "COMMAND_TYPE"); - assertEquals(ao1.get(), "COMMAND_TYPE_VALUE_UPDATE"); - - // unbind object from sock1 - notebookServer.onMessage(sock1, - new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND) - .put("noteId", note1.getId()) - .put("paragraphId", p1.getId()) - .put("name", "COMMAND_TYPE") - .put("value", "COMMAND_TYPE_VALUE") - .put("interpreterGroupId", interpreterGroup.getId()).toJson()); - list = note1.getAngularObjects("angular-shared_process"); - assertEquals(list.size(), 0); - // Check if the interpreterGroup AngularObjectRegistry is delete - mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry(); - AngularObject ao2 = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE"); - assertNull(ao2); - - notebook.removeNote(note1.getId(), anonymous); } @Test public void testLoadAngularObjectFromNote() throws IOException, InterruptedException { // create a notebook - Note note1 = notebook.createNote("note1", anonymous); - - // get reference to interpreterGroup - InterpreterGroup interpreterGroup = null; - List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get(); - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("angular")) { - interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); - break; + Note note1 = null; + try { + note1 = notebook.createNote("note1", anonymous); + + // get reference to interpreterGroup + InterpreterGroup interpreterGroup = null; + List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get(); + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("angular")) { + interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess"); + break; + } } - } - // start interpreter process - Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>"); - p1.setAuthenticationInfo(anonymous); - note1.run(p1.getId()); - - // wait for paragraph finished - while (true) { - if (p1.getStatus() == Job.Status.FINISHED) { - break; + // start interpreter process + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>"); + p1.setAuthenticationInfo(anonymous); + note1.run(p1.getId()); + + // wait for paragraph finished + while (true) { + if (p1.getStatus() == Job.Status.FINISHED) { + break; + } + Thread.sleep(100); + } + // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 + Thread.sleep(1000); + + // set note AngularObject + AngularObject ao = new AngularObject("COMMAND_TYPE", "COMMAND_TYPE_VALUE", note1.getId(), p1.getId(), null); + note1.addOrUpdateAngularObject("angular-shared_process", ao); + + // create sockets and open it + NotebookSocket sock1 = createWebSocket(); + notebookServer.onOpen(sock1); + + // Check the AngularObjectRegistry of the interpreterGroup before executing GET_NOTE + Map<String, Map<String, AngularObject>> mapRegistry1 = interpreterGroup.getAngularObjectRegistry().getRegistry(); + assertEquals(mapRegistry1.size(), 0); + + // open the notebook from sockets, AngularObjectRegistry that triggers the update of the interpreterGroup + notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); + Thread.sleep(1000); + + // After executing GET_NOTE, check the AngularObjectRegistry of the interpreterGroup + Map<String, Map<String, AngularObject>> mapRegistry2 = interpreterGroup.getAngularObjectRegistry().getRegistry(); + assertEquals(mapRegistry1.size(), 2); + AngularObject ao1 = mapRegistry2.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE"); + assertEquals(ao1.getName(), "COMMAND_TYPE"); + assertEquals(ao1.get(), "COMMAND_TYPE_VALUE"); + } finally { + if (note1 != null) { + notebook.removeNote(note1.getId(), anonymous); } - Thread.sleep(100); } - // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277 - Thread.sleep(1000); - - // set note AngularObject - AngularObject ao = new AngularObject("COMMAND_TYPE", "COMMAND_TYPE_VALUE", note1.getId(), p1.getId(), null); - note1.addOrUpdateAngularObject("angular-shared_process", ao); - - // create sockets and open it - NotebookSocket sock1 = createWebSocket(); - notebookServer.onOpen(sock1); - - // Check the AngularObjectRegistry of the interpreterGroup before executing GET_NOTE - Map<String, Map<String, AngularObject>> mapRegistry1 = interpreterGroup.getAngularObjectRegistry().getRegistry(); - assertEquals(mapRegistry1.size(), 0); - - // open the notebook from sockets, AngularObjectRegistry that triggers the update of the interpreterGroup - notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson()); - Thread.sleep(1000); - - // After executing GET_NOTE, check the AngularObjectRegistry of the interpreterGroup - Map<String, Map<String, AngularObject>> mapRegistry2 = interpreterGroup.getAngularObjectRegistry().getRegistry(); - assertEquals(mapRegistry1.size(), 2); - AngularObject ao1 = mapRegistry2.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE"); - assertEquals(ao1.getName(), "COMMAND_TYPE"); - assertEquals(ao1.get(), "COMMAND_TYPE_VALUE"); - - notebook.removeNote(note1.getId(), anonymous); } @Test @@ -434,18 +433,23 @@ public class NotebookServerTest extends AbstractTestRestApi { Message messageReceived = notebookServer.deserializeMessage(msg); Note note = null; try { - note = notebookServer.importNote(null, messageReceived); - } catch (NullPointerException e) { - //broadcastNoteList(); failed nothing to worry. - LOG.error("Exception in NotebookServerTest while testImportNotebook, failed nothing to " + - "worry ", e); - } + try { + note = notebookServer.importNote(null, messageReceived); + } catch (NullPointerException e) { + //broadcastNoteList(); failed nothing to worry. + LOG.error("Exception in NotebookServerTest while testImportNotebook, failed nothing to " + + "worry ", e); + } - assertNotEquals(null, notebook.getNote(note.getId())); - assertEquals("Test Zeppelin notebook import", notebook.getNote(note.getId()).getName()); - assertEquals("Test paragraphs import", notebook.getNote(note.getId()).getParagraphs().get(0) - .getText()); - notebook.removeNote(note.getId(), anonymous); + assertNotEquals(null, notebook.getNote(note.getId())); + assertEquals("Test Zeppelin notebook import", notebook.getNote(note.getId()).getName()); + assertEquals("Test paragraphs import", notebook.getNote(note.getId()).getParagraphs().get(0) + .getText()); + } finally { + if (note != null) { + notebook.removeNote(note.getId(), anonymous); + } + } } @Test @@ -456,20 +460,25 @@ public class NotebookServerTest extends AbstractTestRestApi { Message messageReceived = notebookServer.deserializeMessage(msg); Note note = null; try { - note = notebookServer.importNote(null, messageReceived); - } catch (NullPointerException e) { - //broadcastNoteList(); failed nothing to worry. - LOG.error("Exception in NotebookServerTest while testImportJupyterNote, failed nothing to " + - "worry ", e); - } + try { + note = notebookServer.importNote(null, messageReceived); + } catch (NullPointerException e) { + //broadcastNoteList(); failed nothing to worry. + LOG.error("Exception in NotebookServerTest while testImportJupyterNote, failed nothing to " + + "worry ", e); + } - assertNotEquals(null, notebook.getNote(note.getId())); - assertTrue(notebook.getNote(note.getId()).getName(), - notebook.getNote(note.getId()).getName().startsWith("Note converted from Jupyter_")); - assertEquals("md", notebook.getNote(note.getId()).getParagraphs().get(0).getIntpText()); - assertEquals("# matplotlib - 2D and 3D plotting in Python", - notebook.getNote(note.getId()).getParagraphs().get(0).getScriptText()); - notebook.removeNote(note.getId(), anonymous); + assertNotEquals(null, notebook.getNote(note.getId())); + assertTrue(notebook.getNote(note.getId()).getName(), + notebook.getNote(note.getId()).getName().startsWith("Note converted from Jupyter_")); + assertEquals("md", notebook.getNote(note.getId()).getParagraphs().get(0).getIntpText()); + assertEquals("# matplotlib - 2D and 3D plotting in Python", + notebook.getNote(note.getId()).getParagraphs().get(0).getScriptText()); + } finally { + if (note != null) { + notebook.removeNote(note.getId(), anonymous); + } + } } @Test @@ -483,46 +492,51 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("value", value) .put("paragraphId", "paragraphId"); - final Notebook notebook = mock(Notebook.class); - final NotebookServer server = new NotebookServer(); - server.setNotebook(() -> notebook); - server.setNotebookService(() -> notebookService); - final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + try { + final Notebook notebook = mock(Notebook.class); + notebookServer.setNotebook(() -> notebook); + notebookServer.setNotebookService(() -> notebookService); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); - when(notebook.getNote("noteId")).thenReturn(note); - final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); - when(note.getParagraph("paragraphId")).thenReturn(paragraph); + when(notebook.getNote("noteId")).thenReturn(note); + final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); + when(note.getParagraph("paragraphId")).thenReturn(paragraph); - final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); - final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); - mdGroup.setAngularObjectRegistry(mdRegistry); + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); - when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup); + when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup); - final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId", - "paragraphId"); + final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId", + "paragraphId"); - when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId")) - .thenReturn(ao1); + when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId")) + .thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); - final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao1) - .put("interpreterGroupId", "mdGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraphId")); + final String mdMsg1 = notebookServer.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraphId")); - server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); + notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); - // When - server.angularObjectClientBind(conn, messageReceived); + // When + notebookServer.angularObjectClientBind(conn, messageReceived); - // Then - verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null); + // Then + verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null); - verify(otherConn).send(mdMsg1); + verify(otherConn).send(mdMsg1); + } finally { + // reset these so that it won't affect other tests + notebookServer.setNotebook(() -> NotebookServerTest.notebook); + notebookServer.setNotebookService(() -> NotebookServerTest.notebookService); + } } @Test @@ -535,42 +549,47 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("name", varName) .put("paragraphId", "paragraphId"); - final Notebook notebook = mock(Notebook.class); - final NotebookServer server = new NotebookServer(); - server.setNotebook(() -> notebook); - server.setNotebookService(() -> notebookService); - final Note note = mock(Note.class, RETURNS_DEEP_STUBS); - when(notebook.getNote("noteId")).thenReturn(note); - final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); - when(note.getParagraph("paragraphId")).thenReturn(paragraph); - - final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); - final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); - mdGroup.setAngularObjectRegistry(mdRegistry); - - when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup); - - final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId", - "paragraphId"); - when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); - - final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao1) - .put("interpreterGroupId", "mdGroup") - .put("noteId", "noteId") - .put("paragraphId", "paragraphId")); + try { + final Notebook notebook = mock(Notebook.class); + notebookServer.setNotebook(() -> notebook); + notebookServer.setNotebookService(() -> notebookService); + final Note note = mock(Note.class, RETURNS_DEEP_STUBS); + when(notebook.getNote("noteId")).thenReturn(note); + final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS); + when(note.getParagraph("paragraphId")).thenReturn(paragraph); - server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); + final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class); + final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup"); + mdGroup.setAngularObjectRegistry(mdRegistry); - // When - server.angularObjectClientUnbind(conn, messageReceived); + when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup); - // Then - verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); + final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId", + "paragraphId"); + when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1); + NotebookSocket conn = mock(NotebookSocket.class); + NotebookSocket otherConn = mock(NotebookSocket.class); - verify(otherConn).send(mdMsg1); + final String mdMsg1 = notebookServer.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao1) + .put("interpreterGroupId", "mdGroup") + .put("noteId", "noteId") + .put("paragraphId", "paragraphId")); + + notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); + + // When + notebookServer.angularObjectClientUnbind(conn, messageReceived); + + // Then + verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null); + + verify(otherConn).send(mdMsg1); + } finally { + // reset these so that it won't affect other tests + notebookServer.setNotebook(() -> NotebookServerTest.notebook); + notebookServer.setNotebookService(() -> NotebookServerTest.notebookService); + } } @Test @@ -676,7 +695,7 @@ public class NotebookServerTest extends AbstractTestRestApi { Paragraph p1 = note.addNewParagraph(anonymous); p1.setText("%md start remote interpreter process"); p1.setAuthenticationInfo(anonymous); - notebookServer.getNotebook().saveNote(note, anonymous); + notebook.saveNote(note, anonymous); String noteId = note.getId(); String user1Id = "user1", user2Id = "user2";