This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 91ecc00 [ZEPPELIN-4894]. All notes are loaded into memory when enable recovering 91ecc00 is described below commit 91ecc00f0c33b720026ec76878081b278180f469 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jun 29 06:54:48 2020 +0800 [ZEPPELIN-4894]. All notes are loaded into memory when enable recovering ### What is this PR for? Several improvement changes in this PR: * use Stream<Note> instead of List<Note>, so that we don't need to load all notes into memory and can save memory by calling Note.unLoad * interface refactoring of AngularObjectRegistryListener * Change Notebook.removeNote(noteId) to Notebook.removeNote(Note), because internally we still need noteId to get Note. ### What type of PR is it? [Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4894 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3810 from zjffdu/ZEPPELIN-4894 and squashes the following commits: 2e26dcc27 [Jeff Zhang] save 19aaa39da [Jeff Zhang] tmp save 6d4fdec40 [Jeff Zhang] [ZEPPELIN-4894]. All notes are loaded into memory when enable recovering --- .../apache/zeppelin/shell/BaseInterpreterTest.java | 12 +-- .../zeppelin/submarine/BaseInterpreterTest.java | 12 +-- .../integration/ZeppelinSparkClusterTest.java | 42 +++++----- .../org/apache/zeppelin/display/AngularObject.java | 6 +- .../zeppelin/display/AngularObjectRegistry.java | 14 ++-- .../display/AngularObjectRegistryListener.java | 8 +- .../remote/RemoteInterpreterEventClient.java | 20 ++--- .../display/AngularObjectRegistryTest.java | 9 +-- zeppelin-server/pom.xml | 1 + .../org/apache/zeppelin/rest/NotebookRestApi.java | 2 +- .../org/apache/zeppelin/server/ZeppelinServer.java | 4 +- .../apache/zeppelin/service/JobManagerService.java | 5 +- .../apache/zeppelin/service/NotebookService.java | 7 +- .../org/apache/zeppelin/socket/NotebookServer.java | 92 ++++++++++++++-------- .../src/main/resources/log4j.properties | 25 ------ .../apache/zeppelin/cluster/ClusterEventTest.java | 13 +-- .../org/apache/zeppelin/recovery/RecoveryTest.java | 21 +++-- .../zeppelin/rest/InterpreterRestApiTest.java | 4 +- .../apache/zeppelin/rest/NotebookRestApiTest.java | 84 ++++++++++++-------- .../apache/zeppelin/rest/ZeppelinRestApiTest.java | 44 +++++------ .../apache/zeppelin/socket/NotebookServerTest.java | 18 ++--- .../src/test/resources/log4j.properties | 2 +- .../java/org/apache/zeppelin/notebook/Note.java | 21 +++-- .../org/apache/zeppelin/notebook/NoteManager.java | 29 ++++--- .../org/apache/zeppelin/notebook/Notebook.java | 71 +++++++++++------ .../org/apache/zeppelin/search/LuceneSearch.java | 13 ++- .../org/apache/zeppelin/search/SearchService.java | 3 +- .../helium/HeliumApplicationFactoryTest.java | 10 +-- .../remote/RemoteAngularObjectTest.java | 6 +- .../org/apache/zeppelin/notebook/NotebookTest.java | 72 ++++++++--------- .../notebook/repo/NotebookRepoSyncTest.java | 6 +- 31 files changed, 376 insertions(+), 300 deletions(-) diff --git a/shell/src/test/java/org/apache/zeppelin/shell/BaseInterpreterTest.java b/shell/src/test/java/org/apache/zeppelin/shell/BaseInterpreterTest.java index e93c3dd..76c18f1 100644 --- a/shell/src/test/java/org/apache/zeppelin/shell/BaseInterpreterTest.java +++ b/shell/src/test/java/org/apache/zeppelin/shell/BaseInterpreterTest.java @@ -48,20 +48,20 @@ public abstract class BaseInterpreterTest { new AngularObjectRegistryListener() { @Override - public void onAdd(String interpreterGroupId, AngularObject object) { + public void onAddAngularObject(String interpreterGroupId, + AngularObject angularObject) { onAdd.incrementAndGet(); } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, + AngularObject angularObject) { onUpdate.incrementAndGet(); } @Override - public void onRemove(String interpreterGroupId, - String name, - String noteId, - String paragraphId) { + public void onRemoveAngularObject(String interpreterGroupId, + AngularObject angularObject) { onRemove.incrementAndGet(); } }); diff --git a/submarine/src/test/java/org/apache/zeppelin/submarine/BaseInterpreterTest.java b/submarine/src/test/java/org/apache/zeppelin/submarine/BaseInterpreterTest.java index 6b8febb..4cc6d3b 100644 --- a/submarine/src/test/java/org/apache/zeppelin/submarine/BaseInterpreterTest.java +++ b/submarine/src/test/java/org/apache/zeppelin/submarine/BaseInterpreterTest.java @@ -48,20 +48,20 @@ public abstract class BaseInterpreterTest { new AngularObjectRegistryListener() { @Override - public void onAdd(String interpreterGroupId, AngularObject object) { + public void onAddAngularObject(String interpreterGroupId, + AngularObject angularObject) { onAdd.incrementAndGet(); } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, + AngularObject angularObject) { onUpdate.incrementAndGet(); } @Override - public void onRemove(String interpreterGroupId, - String name, - String noteId, - String paragraphId) { + public void onRemoveAngularObject(String interpreterGroupId, + AngularObject angularObject) { onRemove.incrementAndGet(); } }); diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 2eb505a..21e626e 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -191,7 +191,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.ABORT, p.getStatus()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -208,7 +208,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("55", p.getReturn().message().get(0).getData()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -240,7 +240,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -267,7 +267,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { "org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]\n")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -364,7 +364,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -397,7 +397,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -510,7 +510,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -589,10 +589,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("1", p21.getReturn().message().get(0).getData()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } if (null != note2) { - TestUtils.getInstance(Notebook.class).removeNote(note2.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note2, anonymous); } } } @@ -637,7 +637,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("hello world\n", p5.getReturn().message().get(0).getData()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -674,10 +674,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("1\n6\n2\n", p3.getReturn().message().get(0).getData()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } if (null != note2) { - TestUtils.getInstance(Notebook.class).removeNote(note2.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note2, anonymous); } } } @@ -702,7 +702,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { p.getReturn().message().get(0).getData().contains(sparkVersion)); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -753,7 +753,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("items: Seq[Any] = Buffer(2)", result[4]); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -790,7 +790,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("2", result[3]); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -842,7 +842,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(0, globalAngularObjects.size()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -906,7 +906,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertTrue(p2.getReturn().toString(), p2.getReturn().toString().contains("hello java,scala")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -970,7 +970,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertTrue(p2.getReturn().toString(), p2.getReturn().toString().contains("hello java,scala")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -1000,7 +1000,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertTrue(p2.getReturn().toString(), p2.getReturn().toString().contains("hello world")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -1022,7 +1022,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.FINISHED, p1.getStatus()); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -1052,7 +1052,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { p1.getReturn().message().get(0).getData().contains("No such file or directory")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } // reset SPARK_HOME, otherwise it will cause the following test fail InterpreterSetting sparkIntpSetting = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager() diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java index 1959e3d..f3238f4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java @@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService; */ public class AngularObject<T> implements JsonSerializable { private static final Logger LOGGER = LoggerFactory.getLogger(AngularObject.class); - private static final Gson gson = new Gson(); + private static final Gson GSON = new Gson(); private String name; private T object; @@ -258,10 +258,10 @@ public class AngularObject<T> implements JsonSerializable { } public String toJson() { - return gson.toJson(this); + return GSON.toJson(this); } public static AngularObject fromJson(String json) { - return gson.fromJson(json, AngularObject.class); + return GSON.fromJson(json, AngularObject.class); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java index c2e86b6..54d9814 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java @@ -34,18 +34,18 @@ public class AngularObjectRegistry { Map<String, Map<String, AngularObject>> registry = new HashMap<>(); private final String GLOBAL_KEY = "_GLOBAL_"; private AngularObjectRegistryListener listener; - private String interpreterId; + private String interpreterGroupId; private AngularObjectListener angularObjectListener; - public AngularObjectRegistry(final String interpreterId, + public AngularObjectRegistry(final String interpreterGroupId, final AngularObjectRegistryListener listener) { - this.interpreterId = interpreterId; + this.interpreterGroupId = interpreterGroupId; this.listener = listener; angularObjectListener = new AngularObjectListener() { @Override public void updated(AngularObject updatedObject) { if (listener != null) { - listener.onUpdate(interpreterId, updatedObject); + listener.onUpdateAngularObject(interpreterGroupId, updatedObject); } } }; @@ -117,7 +117,7 @@ public class AngularObjectRegistry { Map<String, AngularObject> noteLocalRegistry = getRegistryForKey(noteId, paragraphId); noteLocalRegistry.put(name, ao); if (listener != null && emit) { - listener.onAdd(interpreterId, ao); + listener.onAddAngularObject(interpreterGroupId, ao); } } @@ -159,7 +159,7 @@ public class AngularObjectRegistry { Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId); AngularObject o = r.remove(name); if (listener != null && emit) { - listener.onRemove(interpreterId, name, noteId, paragraphId); + listener.onRemoveAngularObject(interpreterGroupId, o); } return o; } @@ -240,7 +240,7 @@ public class AngularObjectRegistry { } public String getInterpreterGroupId() { - return interpreterId; + return interpreterGroupId; } public Map<String, Map<String, AngularObject>> getRegistry() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java index 081bb43..77350b6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java @@ -19,10 +19,10 @@ package org.apache.zeppelin.display; /** * - * + * Listener class for angular object operations, such as add, update, remove. */ public interface AngularObjectRegistryListener { - void onAdd(String interpreterGroupId, AngularObject object); - void onUpdate(String interpreterGroupId, AngularObject object); - void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId); + void onAddAngularObject(String interpreterGroupId, AngularObject angularObject); + void onUpdateAngularObject(String interpreterGroupId, AngularObject angularObject); + void onRemoveAngularObject(String interpreterGroupId, AngularObject angularObject); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index eadbf24..99c6d37 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -341,35 +341,37 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, } @Override - public synchronized void onAdd(String interpreterGroupId, AngularObject object) { + public synchronized void onAddAngularObject(String interpreterGroupId, AngularObject angularObject) { try { callRemoteFunction(client -> { - client.addAngularObject(intpGroupId, object.toJson()); + client.addAngularObject(intpGroupId, angularObject.toJson()); return null; }); } catch (Exception e) { - LOGGER.warn("Fail to add AngularObject: " + object, e); + LOGGER.warn("Fail to add AngularObject: " + angularObject, e); } } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, AngularObject angularObject) { try { callRemoteFunction(client -> { - client.updateAngularObject(intpGroupId, object.toJson()); + client.updateAngularObject(intpGroupId, angularObject.toJson()); return null; }); } catch (Exception e) { - LOGGER.warn("Fail to update AngularObject: " + object, e); + LOGGER.warn("Fail to update AngularObject: " + angularObject, e); } } @Override - public void onRemove(String interpreterGroupId, String name, String noteId, - String paragraphId) { + public void onRemoveAngularObject(String interpreterGroupId, AngularObject angularObject) { try { callRemoteFunction(client -> { - client.removeAngularObject(intpGroupId, noteId, paragraphId, name); + client.removeAngularObject(intpGroupId, + angularObject.getNoteId(), + angularObject.getParagraphId(), + angularObject.getName()); return null; }); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java index 529284f..988e6ea 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java @@ -37,20 +37,17 @@ public class AngularObjectRegistryTest { new AngularObjectRegistryListener() { @Override - public void onAdd(String interpreterGroupId, AngularObject object) { + public void onAddAngularObject(String interpreterGroupId, AngularObject angularObject) { onAdd.incrementAndGet(); } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, AngularObject angularObject) { onUpdate.incrementAndGet(); } @Override - public void onRemove(String interpreterGroupId, - String name, - String noteId, - String paragraphId) { + public void onRemoveAngularObject(String interpreterGroupId, AngularObject angularObject) { onRemove.incrementAndGet(); } }); diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 891a409..c0d549f 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -382,6 +382,7 @@ <plugin> <artifactId>maven-surefire-plugin</artifactId> <configuration combine.children="append"> + <forkMode>always</forkMode> <argLine>-Xmx3g -Xms1g -Dfile.encoding=UTF-8</argLine> <excludes> <exclude>${tests.to.exclude}</exclude> diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index dc5cb26..b4f136c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -678,7 +678,7 @@ public class NotebookRestApi extends AbstractRestApi { params = request.getParams(); } - LOG.info("Run note jobs, noteId: {} blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params); + LOG.info("Run note jobs, noteId: {}, blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params); Note note = notebook.getNote(noteId); AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal()); subject.setRoles(new LinkedList<>(authenticationService.getAssociatedRoles())); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index b0a33d2..6c30c34 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -267,7 +267,9 @@ public class ZeppelinServer extends ResourceConfig { // Try to get Notebook from ServiceLocator, because Notebook instantiation is lazy, it is // created when user open zeppelin in browser if we don't get it explicitly here. // Lazy loading will cause paragraph recovery and cron job initialization is delayed. - sharedServiceLocator.getService(Notebook.class); + Notebook notebook = sharedServiceLocator.getService(Notebook.class); + // Try to recover here, don't do it in constructor of Notebook, because it would cause deadlock. + notebook.recoveryIfNecessary(); // when zeppelin is started inside of ide (especially for eclipse) // for graceful shutdown, input any key in console window diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java index f42534d..405f78d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java @@ -75,14 +75,13 @@ public class JobManagerService { if (!conf.isJobManagerEnabled()) { return new ArrayList<>(); } - List<Note> notes = notebook.getAllNotes(); List<NoteJobInfo> notesJobInfo = new ArrayList<>(); - for (Note note : notes) { + notebook.getNoteStream().forEach(note -> { NoteJobInfo noteJobInfo = new NoteJobInfo(note); if (noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime) { notesJobInfo.add(noteJobInfo); } - } + }); callback.onSuccess(notesJobInfo, context); return notesJobInfo; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 71a67af..c73eaf5 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -190,11 +190,12 @@ public class NotebookService { public void removeNote(String noteId, ServiceContext context, ServiceCallback<String> callback) throws IOException { - if (notebook.getNote(noteId) != null) { - if (!checkPermission(noteId, Permission.OWNER, Message.OP.DEL_NOTE, context, callback)) { + Note note = notebook.getNote(noteId); + if (note != null) { + if (!checkPermission(note.getId(), Permission.OWNER, Message.OP.DEL_NOTE, context, callback)) { return; } - notebook.removeNote(noteId, context.getAutheInfo()); + notebook.removeNote(note, context.getAutheInfo()); callback.onSuccess("Delete note successfully", context); } else { callback.onFailure(new NoteNotFoundException(noteId), context); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 2637e97..aefc61e 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -56,7 +56,6 @@ import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.helium.HeliumPackage; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; @@ -1970,53 +1969,80 @@ public class NotebookServer extends WebSocketServlet } @Override - public void onAdd(String interpreterGroupId, AngularObject object) { - onUpdate(interpreterGroupId, object); + public void onAddAngularObject(String interpreterGroupId, AngularObject angularObject) { + onUpdateAngularObject(interpreterGroupId, angularObject); } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, AngularObject angularObject) { if (getNotebook() == null) { return; } - List<Note> notes = getNotebook().getAllNotes(); - for (Note note : notes) { - if (object.getNoteId() != null && !note.getId().equals(object.getNoteId())) { - continue; - } - - List<InterpreterSetting> intpSettings = - note.getBindedInterpreterSettings( - new ArrayList<>(getNotebookAuthorizationService().getOwners(note.getId()))); - if (intpSettings.isEmpty()) { - continue; + // not global scope, so we just need to load the corresponded note. + if (angularObject.getNoteId() != null) { + try { + Note note = getNotebook().getNote(angularObject.getNoteId()); + updateNoteAngularObject(note, angularObject, interpreterGroupId); + } catch (IOException e) { + LOG.error("AngularObject's note: {} is not found", angularObject.getNoteId()); } + } else { + // global scope angular object needs to load and iterate all notes, this is inefficient. + getNotebook().getNoteStream().forEach(note -> { + if (angularObject.getNoteId() != null && !note.getId().equals(angularObject.getNoteId())) { + return; + } + updateNoteAngularObject(note, angularObject, interpreterGroupId); + }); + } + } - getConnectionManager().broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) - .put("paragraphId", object.getParagraphId())); + private void updateNoteAngularObject(Note note, AngularObject angularObject, String interpreterGroupId) { + List<InterpreterSetting> intpSettings = + note.getBindedInterpreterSettings( + new ArrayList<>(getNotebookAuthorizationService().getOwners(note.getId()))); + if (intpSettings.isEmpty()) { + return; } + getConnectionManager().broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", angularObject) + .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) + .put("paragraphId", angularObject.getParagraphId())); } @Override - public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { - List<Note> notes = getNotebook().getAllNotes(); - for (Note note : notes) { - if (noteId != null && !note.getId().equals(noteId)) { - continue; + public void onRemoveAngularObject(String interpreterGroupId, AngularObject angularObject) { + // not global scope, so we just need to load the corresponded note. + if (angularObject.getNoteId() != null) { + try { + Note note = getNotebook().getNote(angularObject.getNoteId()); + removeNoteAngularObject(angularObject.getNoteId(), angularObject, interpreterGroupId); + } catch (IOException e) { + LOG.error("AngularObject's note: {} is not found", angularObject.getNoteId()); } - - List<String> settingIds = - getNotebook().getInterpreterSettingManager().getSettingIds(); - for (String id : settingIds) { - if (interpreterGroupId.contains(id)) { - getConnectionManager().broadcast(note.getId(), - new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId) - .put("paragraphId", paragraphId)); - break; + } else { + // global scope angular object needs to load and iterate all notes, this is inefficient. + getNotebook().getNoteStream().forEach(note -> { + if (angularObject.getNoteId() != null && !note.getId().equals(angularObject.getNoteId())) { + return; } + removeNoteAngularObject(note.getId(), angularObject, interpreterGroupId); + }); + } + } + + private void removeNoteAngularObject(String noteId, AngularObject angularObject, String interpreterGroupId) { + List<String> settingIds = + getNotebook().getInterpreterSettingManager().getSettingIds(); + for (String id : settingIds) { + if (interpreterGroupId.contains(id)) { + getConnectionManager().broadcast(noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("name", angularObject.getName()) + .put("noteId", angularObject.getNoteId()) + .put("paragraphId", angularObject.getParagraphId())); + break; } } } diff --git a/zeppelin-server/src/main/resources/log4j.properties b/zeppelin-server/src/main/resources/log4j.properties deleted file mode 100644 index 2f64407..0000000 --- a/zeppelin-server/src/main/resources/log4j.properties +++ /dev/null @@ -1,25 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -log4j.rootLogger = INFO, stdout - -log4j.appender.stdout = org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout = org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n - -log4j.additivity.org.apache.zeppelin.interpreter = false -log4j.logger.org.apache.zeppelin.interpreter = DEBUG, stdout diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java index 90ba134..95e1caa 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java @@ -316,7 +316,7 @@ public class ClusterEventTest extends ZeppelinServerMock { } finally { // cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -354,10 +354,11 @@ public class ClusterEventTest extends ZeppelinServerMock { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } - if (null != clonedNoteId) { - TestUtils.getInstance(Notebook.class).removeNote(clonedNoteId, anonymous); + Note clonedNote = TestUtils.getInstance(Notebook.class).getNote(clonedNoteId); + if (null != clonedNote) { + TestUtils.getInstance(Notebook.class).removeNote(clonedNote, anonymous); } } } @@ -389,7 +390,7 @@ public class ClusterEventTest extends ZeppelinServerMock { } finally { // cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -455,7 +456,7 @@ public class ClusterEventTest extends ZeppelinServerMock { LOGGER.error(e.getMessage(), e); } finally { if (null != note) { - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java index 85fea77..435330b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java @@ -100,6 +100,8 @@ public class RecoveryTest extends AbstractTestRestApi { // run the paragraph again, but change the text to print variable `user` note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId()); + Thread.sleep(10 * 1000); + note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId()); p1 = note1.getParagraph(p1.getId()); p1.setText("%python print(user)"); post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", ""); @@ -112,7 +114,7 @@ public class RecoveryTest extends AbstractTestRestApi { throw e; } finally { if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @@ -145,6 +147,7 @@ public class RecoveryTest extends AbstractTestRestApi { shutDown(); startUp(RecoveryTest.class.getSimpleName(), false); + Thread.sleep(5 * 1000); // run the paragraph again, but change the text to print variable `user`. // can not recover the python interpreter, because it has been shutdown. note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId()); @@ -159,7 +162,7 @@ public class RecoveryTest extends AbstractTestRestApi { throw e; } finally { if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @@ -189,6 +192,7 @@ public class RecoveryTest extends AbstractTestRestApi { startUp(RecoveryTest.class.getSimpleName(), false); + Thread.sleep(5 * 1000); // run the paragraph again, but change the text to print variable `user`. // can not recover the python interpreter, because it has been shutdown. note1 = TestUtils.getInstance(Notebook.class).getNote(note1.getId()); @@ -203,7 +207,7 @@ public class RecoveryTest extends AbstractTestRestApi { throw e; } finally { if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @@ -248,12 +252,13 @@ public class RecoveryTest extends AbstractTestRestApi { assertEquals(Job.Status.FINISHED, p1.getStatus()); assertEquals("hello\n", p1.getReturn().message().get(0).getData()); + Thread.sleep(5 * 1000); } catch (Exception e ) { LOG.error(e.toString(), e); throw e; } finally { if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @@ -270,7 +275,7 @@ public class RecoveryTest extends AbstractTestRestApi { note1 = TestUtils.getInstance(Notebook.class).createNote("note4", AuthenticationInfo.ANONYMOUS); - // run sh paragraph async, print 'hello' after 10 seconds + // run paragraph async, print 'hello' after 10 seconds Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("%python import time\n" + "for i in range(1, 10):\n" + @@ -288,9 +293,11 @@ public class RecoveryTest extends AbstractTestRestApi { // shutdown zeppelin and restart it shutDown(); // sleep 15 seconds to make sure the paragraph is finished - Thread.sleep(10 * 1500); + Thread.sleep(15 * 1000); startUp(RecoveryTest.class.getSimpleName(), false); + // sleep 10 seconds to make sure recovering is finished + Thread.sleep(10 * 1000); assertEquals(Job.Status.FINISHED, p1.getStatus()); assertEquals("1\n" + @@ -307,7 +314,7 @@ public class RecoveryTest extends AbstractTestRestApi { throw e; } finally { if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java index 48d6c51..cb96df2 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java @@ -337,7 +337,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { getSimulatedMarkdownResult("markdown restarted")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -394,7 +394,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi { } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java index c05df07..6039bf2 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java @@ -81,6 +81,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { @Test public void testGetNoteParagraphJobStatus() throws IOException { + LOG.info("Running testGetNoteParagraphJobStatus"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -100,13 +101,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunParagraphJob() throws IOException { + LOG.info("Running testRunParagraphJob"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -135,13 +137,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunParagraphSynchronously() throws IOException { + LOG.info("Running testRunParagraphSynchronously"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -191,31 +194,32 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunNoteBlocking() throws IOException { + LOG.info("Running testRunNoteBlocking"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); // 2 paragraphs // P1: // %python + // from __future__ import print_function // import time // time.sleep(1) // user='abc' // P2: // %python - // from __future__ import print_function // print(user) // Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%python import time\ntime.sleep(1)\nuser='abc'"); - p2.setText("%python from __future__ import print_function\nprint(user)"); + p1.setText("%python from __future__ import print_function\nimport time\ntime.sleep(1)\nuser='abc'"); + p2.setText("%python print(user)"); PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", ""); assertThat(post, isAllowed()); @@ -230,13 +234,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunNoteNonBlocking() throws Exception { + LOG.info("Running testRunNoteNonBlocking"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -272,13 +277,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunNoteBlocking_Isolated() throws IOException { + LOG.info("Running testRunNoteBlocking_Isolated"); Note note1 = null; try { InterpreterSettingManager interpreterSettingManager = @@ -290,18 +296,18 @@ public class NotebookRestApiTest extends AbstractTestRestApi { // 2 paragraphs // P1: // %python + // from __future__ import print_function // import time // time.sleep(1) // user='abc' // P2: // %python - // from __future__ import print_function // print(user) // Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%python import time\ntime.sleep(1)\nuser='abc'"); - p2.setText("%python from __future__ import print_function\nprint(user)"); + p1.setText("%python from __future__ import print_function\nimport time\ntime.sleep(1)\nuser='abc'"); + p2.setText("%python print(user)"); PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true&isolated=true", ""); assertThat(post, isAllowed()); @@ -319,13 +325,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunNoteNonBlocking_Isolated() throws IOException, InterruptedException { + LOG.info("Running testRunNoteNonBlocking_Isolated"); Note note1 = null; try { InterpreterSettingManager interpreterSettingManager = @@ -337,18 +344,18 @@ public class NotebookRestApiTest extends AbstractTestRestApi { // 2 paragraphs // P1: // %python + // from __future__ import print_function // import time // time.sleep(1) // user='abc' // P2: // %python - // from __future__ import print_function // print(user) // Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%python import time\ntime.sleep(1)\nuser='abc'"); - p2.setText("%python from __future__ import print_function\nprint(user)"); + p1.setText("%python from __future__ import print_function\nimport time\ntime.sleep(1)\nuser='abc'"); + p2.setText("%python print(user)"); PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=false&isolated=true", ""); assertThat(post, isAllowed()); @@ -370,7 +377,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @@ -433,30 +440,33 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testRunAllParagraph_FirstFailed() throws IOException { + LOG.info("Running testRunAllParagraph_FirstFailed"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); // 2 paragraphs // P1: // %python + // from __future__ import print_function // import time // time.sleep(1) - // from __future__ import print_function - // print(user) + // print(user2) + // // P2: // %python - // user='abc' + // user2='abc' + // print(user2) // Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%python import time\ntime.sleep(1)\nfrom __future__ import print_function\nprint(user2)"); + p1.setText("%python from __future__ import print_function\nimport time\ntime.sleep(1)\nprint(user2)"); p2.setText("%python user2='abc'\nprint(user2)"); PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", ""); @@ -473,13 +483,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } @Test public void testCloneNote() throws IOException { + LOG.info("Running testCloneNote"); Note note1 = null; String clonedNoteId = null; try { @@ -503,16 +514,20 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } if (null != clonedNoteId) { - TestUtils.getInstance(Notebook.class).removeNote(clonedNoteId, anonymous); + Note clonedNote = TestUtils.getInstance(Notebook.class).getNote(clonedNoteId); + if (clonedNote != null) { + TestUtils.getInstance(Notebook.class).removeNote(clonedNote, anonymous); + } } } } @Test public void testRenameNote() throws IOException { + LOG.info("Running testRenameNote"); Note note = null; try { String oldName = "old_name"; @@ -531,13 +546,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @Test public void testUpdateParagraphConfig() throws IOException { + LOG.info("Running testUpdateParagraphConfig"); Note note = null; try { note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -563,13 +579,14 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @Test public void testClearAllParagraphOutput() throws IOException { + LOG.info("Running testClearAllParagraphOutput"); Note note = null; try { // Create note and set result explicitly @@ -606,31 +623,32 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @Test public void testRunWithServerRestart() throws Exception { + LOG.info("Running testRunWithServerRestart"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); // 2 paragraphs // P1: // %python + // from __future__ import print_function // import time // time.sleep(1) - // from __future__ import print_function - // print(user) + // user='abc' // P2: // %python - // user='abc' + // print(user) // Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setText("%python import time\ntime.sleep(1)\nuser='abc'"); - p2.setText("%python from __future__ import print_function\nprint(user)"); + p1.setText("%python from __future__ import print_function\nimport time\ntime.sleep(1)\nuser='abc'"); + p2.setText("%python print(user)"); PostMethod post1 = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", ""); assertThat(post1, isAllowed()); @@ -662,7 +680,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } finally { // cleanup if (null != note1) { - TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note1, anonymous); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 4a63eba..65faed8 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -126,7 +126,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { assertEquals(paragraphText, paragraphs.get(0).get("text")); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -188,7 +188,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } } // cleanup - TestUtils.getInstance(Notebook.class).removeNote(newNoteId, anonymous); + TestUtils.getInstance(Notebook.class).removeNote(newNote, anonymous); post.releaseConnection(); } @@ -214,7 +214,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } assertEquals("compare note name", noteName, newNoteName); // cleanup - TestUtils.getInstance(Notebook.class).removeNote(newNoteId, anonymous); + TestUtils.getInstance(Notebook.class).removeNote(newNote, anonymous); post.releaseConnection(); } @@ -230,7 +230,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { if (null != note) { if (TestUtils.getInstance(Notebook.class).getNote(note.getId()) != null) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -273,7 +273,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { get.releaseConnection(); } finally { if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -302,7 +302,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { // get note content as JSON oldJson = getNoteContent(sourceNoteId); // delete it first then import it - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); // call note post PostMethod importPost = httpPost("/notebook/import/", oldJson); @@ -321,12 +321,12 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { if (null != note) { if (TestUtils.getInstance(Notebook.class).getNote(note.getId()) != null) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } if (null != newNote) { if (TestUtils.getInstance(Notebook.class).getNote(newNote.getId()) != null) { - TestUtils.getInstance(Notebook.class).removeNote(newNote.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(newNote, anonymous); } } } @@ -403,10 +403,10 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } if (null != newNote) { - TestUtils.getInstance(Notebook.class).removeNote(newNote.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(newNote, anonymous); } } } @@ -484,7 +484,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -539,7 +539,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -581,7 +581,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -630,7 +630,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } System.clearProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_ENABLE.getVarName()); } @@ -680,7 +680,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } System.clearProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_ENABLE.getVarName()); } @@ -709,7 +709,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -773,7 +773,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -822,7 +822,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -857,7 +857,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -897,7 +897,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -924,7 +924,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } @@ -960,7 +960,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } finally { //cleanup if (null != note) { - TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + TestUtils.getInstance(Notebook.class).removeNote(note, anonymous); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 185de1f..48631bd 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -172,7 +172,7 @@ public class NotebookServerTest extends AbstractTestRestApi { verify(sock1, times(++sock1SendCount)).send(anyString()); verify(sock2, times(sock2SendCount)).send(anyString()); - notebook.removeNote(createdNote.getId(), anonymous); + notebook.removeNote(createdNote, anonymous); } private void patchParagraph(NotebookSocket noteSocket, String paragraphId, String patch) { @@ -250,7 +250,7 @@ public class NotebookServerTest extends AbstractTestRestApi { verify(sock2, times(1)).send(anyString()); } finally { if (note1 != null) { - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } } } @@ -355,7 +355,7 @@ public class NotebookServerTest extends AbstractTestRestApi { assertNull(ao2); } finally { if (note1 != null) { - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } } } @@ -417,7 +417,7 @@ public class NotebookServerTest extends AbstractTestRestApi { assertEquals(ao1.get(), "COMMAND_TYPE_VALUE"); } finally { if (note1 != null) { - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } } } @@ -447,7 +447,7 @@ public class NotebookServerTest extends AbstractTestRestApi { .getText()); } finally { if (note != null) { - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } } } @@ -476,7 +476,7 @@ public class NotebookServerTest extends AbstractTestRestApi { notebook.getNote(note.getId()).getParagraphs().get(0).getScriptText()); } finally { if (note != null) { - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } } } @@ -635,7 +635,7 @@ public class NotebookServerTest extends AbstractTestRestApi { assertEquals(notebook.getInterpreterSettingManager().getDefaultInterpreterSetting( createdNote.getId()).getId(), defaultInterpreterId); } - notebook.removeNote(createdNote.getId(), anonymous); + notebook.removeNote(createdNote, anonymous); } @Test @@ -742,7 +742,7 @@ public class NotebookServerTest extends AbstractTestRestApi { assertNotNull(user1Id + " can get " + user2Id + "'s shared note", paragraphList2); } finally { if (null != note) { - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } } } @@ -778,7 +778,7 @@ public class NotebookServerTest extends AbstractTestRestApi { assertEquals(0, note.getParagraphCount()); } finally { if (null != note) { - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } } } diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties index 70e4e8f..292305d 100644 --- a/zeppelin-server/src/test/resources/log4j.properties +++ b/zeppelin-server/src/test/resources/log4j.properties @@ -19,7 +19,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n #log4j.appender.stdout.layout.ConversionPattern= #%5p [%t] (%F:%L) - %m%n #%-4r [%t] %-5p %c %x - %m%n diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 90a2d3a..0e9fe89 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -766,14 +766,27 @@ public class Note implements JsonSerializable { boolean blocking, boolean isolated, Map<String, Object> params) throws Exception { + setIsolatedMode(isolated); + setRunning(true); + setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now())); if (blocking) { - runAllSync(authInfo, isolated, params); + try { + runAllSync(authInfo, isolated, params); + } finally { + setRunning(false); + setIsolatedMode(false); + clearStartTime(); + } } else { ExecutorFactory.singleton().getNoteJobExecutor().submit(() -> { try { runAllSync(authInfo, isolated, params); } catch (Exception e) { LOGGER.warn("Fail to run note: " + id, e); + } finally { + setRunning(false); + setIsolatedMode(false); + clearStartTime(); } }); } @@ -786,9 +799,6 @@ public class Note implements JsonSerializable { * @param isolated */ private void runAllSync(AuthenticationInfo authInfo, boolean isolated, Map<String, Object> params) throws Exception { - setIsolatedMode(isolated); - setRunning(true); - setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now())); try { for (Paragraph p : getParagraphs()) { if (!p.isEnabled()) { @@ -836,9 +846,6 @@ public class Note implements JsonSerializable { setting.closeInterpreters(executionContext); } } - setRunning(false); - setIsolatedMode(false); - clearStartTime(); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java index e19ece4..ca50618 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java @@ -30,9 +30,11 @@ import javax.inject.Singleton; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Manager class for note. It handle all the note related operations, such as get, create, @@ -86,17 +88,22 @@ public class NoteManager { return notesInfo; } - //TODO(zjffdu) This is inefficient - public List<Note> getAllNotes() { - List<Note> notes = new ArrayList<>(); - for (String notePath : notesInfo.values()) { - try { - notes.add(getNoteNode(notePath).getNote()); - } catch (Exception e) { - LOGGER.warn("Fail to load note: " + notePath, e); - } - } - return notes; + /** + * Return java stream instead of List to save memory, otherwise OOM will happen + * when there's large amount of notes. + * @return + */ + public Stream<Note> getNotesStream() { + return notesInfo.values().stream() + .map(notePath -> { + try { + return getNoteNode(notePath).getNote(); + } catch (Exception e) { + LOGGER.warn("Fail to load note: " + notePath, e); + return null; + } + }) + .filter(note -> note != null); } /** diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 2727436..25685a3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -29,7 +29,10 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; @@ -106,23 +109,45 @@ public class Notebook { this.noteEventListeners.add(this.interpreterSettingManager); if (conf.isIndexRebuild()) { - noteSearchService.startRebuildIndex(getAllNotes()); + noteSearchService.startRebuildIndex(getNoteStream()); } } - private void recoverRunningParagraphs() { + public void recoveryIfNecessary() { + if (conf.isRecoveryEnabled()) { + recoverRunningParagraphs(); + } + } + private void recoverRunningParagraphs() { Thread thread = new Thread(() -> { - for (Note note : getAllNotes()) { - for (Paragraph paragraph : note.getParagraphs()) { - if (paragraph.getStatus() == Job.Status.RUNNING) { - paragraph.recover(); + getNoteStream().forEach(note -> { + try { + boolean hasRecoveredParagraph = false; + for (Paragraph paragraph : note.getParagraphs()) { + if (paragraph.getStatus() == Job.Status.RUNNING) { + paragraph.recover(); + hasRecoveredParagraph = true; + } + } + // unload note to save memory when there's no paragraph recovering. + if (!hasRecoveredParagraph) { + note.unLoad(); } + } catch (Exception e) { + LOGGER.warn("Fail to recovery note: " + note.getPath(), e); } - } + }); }); thread.setName("Recovering-Thread"); thread.start(); + LOGGER.info("Start paragraph recovering thread"); + + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @Inject @@ -150,8 +175,6 @@ public class Notebook { this.noteEventListeners.add(noteEventListener); } this.paragraphJobListener = (ParagraphJobListener) noteEventListener; - - recoverRunningParagraphs(); } public NoteManager getNoteManager() { @@ -305,13 +328,9 @@ public class Notebook { return newNote; } - public void removeNote(String noteId, AuthenticationInfo subject) throws IOException { - LOGGER.info("Remove note " + noteId); - Note note = getNote(noteId); - if (note == null) { - throw new IOException("Note " + noteId + " not found"); - } - noteManager.removeNote(noteId, subject); + public void removeNote(Note note, AuthenticationInfo subject) throws IOException { + LOGGER.info("Remove note: {}", note.getId()); + noteManager.removeNote(note.getId(), subject); fireNoteRemoveEvent(note, subject); } @@ -561,9 +580,8 @@ public class Notebook { .collect(Collectors.toList()); } - public List<Note> getAllNotes() { - List<Note> noteList = noteManager.getAllNotes(); - for (Note note : noteList) { + public Stream<Note> getNoteStream() { + return noteManager.getNotesStream().map(note -> { note.setInterpreterFactory(replFactory); note.setInterpreterSettingManager(interpreterSettingManager); note.setParagraphJobListener(paragraphJobListener); @@ -573,19 +591,25 @@ public class Notebook { p.setNote(note); p.setListener(paragraphJobListener); } - } - Collections.sort(noteList, Comparator.comparing(Note::getPath)); - return noteList; + return note; + }); } + @VisibleForTesting public List<Note> getAllNotes(Function<Note, Boolean> func){ return getAllNotes().stream() .filter(note -> func.apply(note)) .collect(Collectors.toList()); } + @VisibleForTesting + public List<Note> getAllNotes() { + List<Note> notes = getNoteStream().collect(Collectors.toList()); + Collections.sort(notes, Comparator.comparing(Note::getPath)); + return notes; + } + public List<NoteInfo> getNotesInfo(Function<String, Boolean> func) { - LOGGER.info("Start getNoteList"); String homescreenNoteId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); boolean hideHomeScreenNotebookFromList = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); @@ -609,7 +633,6 @@ public class Notebook { } return name1.compareTo(name2); }); - LOGGER.info("Finish getNoteList"); return notesInfo; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index 58f203e..2b3a24c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -28,6 +28,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import javax.inject.Inject; import org.apache.lucene.analysis.Analyzer; @@ -421,15 +422,21 @@ public class LuceneSearch extends SearchService { } @Override - public void startRebuildIndex(List<Note> notes) { + public void startRebuildIndex(Stream<Note> notes) { Thread thread = new Thread(() -> { logger.info("Starting rebuild index"); - for (Note note: notes) { + notes.forEach(note -> { addIndexDoc(note); - } + note.unLoad(); + }); logger.info("Finish rebuild index"); }); thread.setName("LuceneSearch-RebuildIndex-Thread"); thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java index caa1bc2..bb4c189 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteEventAsyncListener; @@ -134,5 +135,5 @@ public abstract class SearchService extends NoteEventAsyncListener { } } - public abstract void startRebuildIndex(List<Note> notes); + public abstract void startRebuildIndex(Stream<Note> notes); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 31d4c2b..3ff965d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -128,7 +128,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { // clean heliumAppFactory.unload(p1, appId); - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } @Test @@ -165,7 +165,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); // clean - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } @@ -201,7 +201,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); // clean - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } @Test @@ -223,7 +223,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { } // remove note - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } @@ -273,6 +273,6 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); // clean - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 1029696..6febe66 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -147,17 +147,17 @@ public class RemoteAngularObjectTest extends AbstractInterpreterTest } @Override - public void onAdd(String interpreterGroupId, AngularObject object) { + public void onAddAngularObject(String interpreterGroupId, AngularObject angularObject) { onAdd.incrementAndGet(); } @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { + public void onUpdateAngularObject(String interpreterGroupId, AngularObject angularObject) { onUpdate.incrementAndGet(); } @Override - public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { + public void onRemoveAngularObject(String interpreterGroupId, AngularObject angularObject) { onRemove.incrementAndGet(); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index d995664..ad99ca7 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -292,7 +292,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo note.run(p2.getId()); while (p2.isTerminated() == false || p2.getReturn() == null) Thread.yield(); assertEquals("repl2: hello world", p2.getReturn().message().get(0).getData()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -385,7 +385,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // interpreterSettingManager, null, null, null); //assertEquals(1, notebook2.getAllNotes().size()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -398,7 +398,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo Set<String> owners = new HashSet<>(); owners.add("user1"); assertEquals(owners, authorizationService.getOwners(note.getId())); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -418,7 +418,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // clear paragraph output/result note.clearParagraphOutput(p1.getId()); assertNull(p1.getReturn()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -432,7 +432,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo Thread.sleep(2 * 1000); assertEquals(p1.getStatus(), Status.FINISHED); assertNull(p1.getDateStarted()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -449,7 +449,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals("Interpreter invalid not found", result.message().get(0).getData()); assertNull(p1.getDateStarted()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -481,7 +481,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertNull(p2.getReturn()); assertEquals("repl1: p3", p3.getReturn().message().get(0).getData()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -512,7 +512,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertNotNull(dateFinished); Thread.sleep(2 * 1000); assertEquals(dateFinished, p.getDateFinished()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -549,7 +549,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo } // remove the note - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -667,7 +667,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo private void terminateScheduledNote(Note note) throws IOException { note.getConfig().remove("cron"); schedulerService.refreshCron(note.getId()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @@ -716,7 +716,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // make sure all paragraph has been executed assertNotNull(p.getDateFinished()); assertNotNull(p2.getDateFinished()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } // @Test @@ -796,8 +796,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo schedulerService.refreshCron(cronNote.getId()); // remove notebooks - notebook.removeNote(cronNote.getId(), anonymous); - notebook.removeNote(anotherNote.getId(), anonymous); + notebook.removeNote(cronNote, anonymous); + notebook.removeNote(anotherNote, anonymous); } @Test @@ -818,7 +818,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // remove cron scheduler. config.remove("cron"); schedulerService.refreshCron(note.getId()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -850,9 +850,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo Set<String> owners = new HashSet<>(); owners.add("user1"); assertEquals(owners, authorizationService.getOwners(importedNote2.getId())); - notebook.removeNote(note.getId(), anonymous); - notebook.removeNote(importedNote.getId(), anonymous); - notebook.removeNote(importedNote2.getId(), anonymous); + notebook.removeNote(note, anonymous); + notebook.removeNote(importedNote, anonymous); + notebook.removeNote(importedNote2, anonymous); } @Test @@ -881,9 +881,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo Set<String> owners = new HashSet<>(); owners.add("user1"); assertEquals(owners, authorizationService.getOwners(cloneNote2.getId())); - notebook.removeNote(note.getId(), anonymous); - notebook.removeNote(cloneNote.getId(), anonymous); - notebook.removeNote(cloneNote2.getId(), anonymous); + notebook.removeNote(note, anonymous); + notebook.removeNote(cloneNote, anonymous); + notebook.removeNote(cloneNote2, anonymous); } @Test @@ -906,7 +906,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertEquals(1, interpreterSettingManager.getAllResources().size()); // remove note - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); assertEquals(0, interpreterSettingManager.getAllResources().size()); } @@ -931,7 +931,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo registry.add("o3", "object3", null, null); // remove notebook - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); // notebook scope or paragraph scope object should be removed assertNull(registry.get("o1", note.getId(), null)); @@ -970,7 +970,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // notebook scope and global object sould be remained assertNotNull(registry.get("o2", note.getId(), null)); assertNotNull(registry.get("o3", null, null)); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -996,7 +996,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // New InterpreterGroup will be created and its AngularObjectRegistry will be created assertNull(registry.get("o1", note.getId(), null)); assertNull(registry.get("o2", null, null)); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -1049,7 +1049,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertEquals(authorizationService.isReader(note.getId(), new HashSet<>(Arrays.asList("user4"))), true); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -1183,7 +1183,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertEquals(Status.ABORT, p2.getStatus()); assertEquals(Status.READY, p3.getStatus()); - notebook.removeNote(note.getId(), anonymous); + notebook.removeNote(note, anonymous); } @Test @@ -1205,7 +1205,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo InterpreterResult result = p1.getReturn(); // remove note and recreate - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); note1 = notebook.createNote("note1", anonymous); p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("%mock1 getId"); @@ -1215,7 +1215,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo while (p1.getStatus() != Status.FINISHED) Thread.yield(); assertNotEquals(p1.getReturn().message(), result.message()); - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } @Test @@ -1257,8 +1257,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertNotEquals(p1.getReturn().message(), p2.getReturn().message().get(0).getData()); - notebook.removeNote(note1.getId(), anonymous); - notebook.removeNote(note2.getId(), anonymous); + notebook.removeNote(note1, anonymous); + notebook.removeNote(note2, anonymous); } @@ -1315,8 +1315,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertNotEquals(p1.getReturn().message().get(0).getData(), p2.getReturn().message().get(0).getData()); - notebook.removeNote(note1.getId(), anonymous); - notebook.removeNote(note2.getId(), anonymous); + notebook.removeNote(note1, anonymous); + notebook.removeNote(note2, anonymous); } public void testNotebookEventListener() throws IOException { @@ -1375,7 +1375,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo note1.removeParagraph(anonymous.getUser(), p1.getId()); assertEquals(1, onParagraphRemove.get()); - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); assertEquals(1, onNoteRemove.get()); assertEquals(1, onParagraphRemove.get()); } @@ -1400,8 +1400,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertEquals(0, notebook.getAllNotes(note -> authorizationService.isReader(note.getId(), Sets.newHashSet("anonymous"))).size()); assertEquals(1, notebook.getAllNotes(note -> authorizationService.isReader(note.getId(), Sets.newHashSet("user1"))).size()); assertEquals(1, notebook.getAllNotes(note -> authorizationService.isReader(note.getId(), Sets.newHashSet("user2"))).size()); - notebook.removeNote(note1.getId(), AuthenticationInfo.ANONYMOUS); - notebook.removeNote(note2.getId(), AuthenticationInfo.ANONYMOUS); + notebook.removeNote(note1, AuthenticationInfo.ANONYMOUS); + notebook.removeNote(note2, AuthenticationInfo.ANONYMOUS); } @Test @@ -1413,7 +1413,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo } catch (Exception e) { assertTrue(e.getMessage().contains("Note '/note1' existed")); } finally { - notebook.removeNote(note1.getId(), anonymous); + notebook.removeNote(note1, anonymous); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 31d396e..d1ec696 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -134,7 +134,8 @@ public class NotebookRepoSyncTest { assertEquals(1, notebookRepoSync.list(1, anonymous).size()); assertEquals(notebookRepoSync.list(0, anonymous).get(0).getId(), notebookRepoSync.list(1, anonymous).get(0).getId()); - notebook.removeNote(notebookRepoSync.list(0, null).get(0).getId(), anonymous); + NoteInfo noteInfo = notebookRepoSync.list(0, null).get(0); + notebook.removeNote(notebookRepoSync.get(noteInfo.getId(), note.getPath(), anonymous), anonymous); } @Test @@ -152,7 +153,8 @@ public class NotebookRepoSyncTest { assertEquals(notebookRepoSync.list(0, anonymous).get(0).getId(), notebookRepoSync.list(1, anonymous).get(0).getId()); /* remove Note */ - notebook.removeNote(notebookRepoSync.list(0, anonymous).get(0).getId(), anonymous); + NoteInfo noteInfo = notebookRepoSync.list(0, null).get(0); + notebook.removeNote(notebookRepoSync.get(noteInfo.getId(), noteInfo.getPath(), anonymous), anonymous); /* check that deleted in both storages */ assertEquals(0, notebookRepoSync.list(0, anonymous).size());