This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new e15c515 [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch e15c515 is described below commit e15c515de9f864828bbd441ce10a8ce50f8c631d Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Dec 17 21:41:51 2020 +0800 [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch ### What is this PR for? There's many unnecessary indexing in LuceneSearch which cause performance issue when there' lots of jobs running in zeppelin. This PR is to fix it. * Only index the current paragraph when paragraph is updated.( All the paragraphs will be indexed * Only index/update note name index when note is updated. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5153 ### How should this be tested? * Manually tesed ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3992 from zjffdu/ZEPPELIN-5153 and squashes the following commits: 6ec0627f4 [Jeff Zhang] Fix comment 76fa0ec18 [Jeff Zhang] [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch (cherry picked from commit e95af9fffeb6981493de49afe8354bf4d8472ce0) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../apache/zeppelin/service/NotebookService.java | 2 +- .../apache/zeppelin/rest/ZeppelinRestApiTest.java | 2 +- .../zeppelin/notebook/NoteEventAsyncListener.java | 20 +++-- .../org/apache/zeppelin/notebook/Notebook.java | 4 + .../org/apache/zeppelin/search/LuceneSearch.java | 96 ++++++++-------------- .../org/apache/zeppelin/search/SearchService.java | 66 +++++++-------- .../apache/zeppelin/search/LuceneSearchTest.java | 12 +-- 7 files changed, 90 insertions(+), 112 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index c6f4124..44477fb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -746,7 +746,7 @@ public class NotebookService { schedulerService.refreshCron(note.getId()); } - notebook.saveNote(note, context.getAutheInfo()); + notebook.updateNote(note, context.getAutheInfo()); callback.onSuccess(note, context); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 073fe21..f032984 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -944,7 +944,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { "\"text\": \"ThisIsToTestSearchMethodWithTitle \"}"; CloseableHttpResponse postNoteText = httpPost("/notebook/" + note.getId() + "/paragraph", jsonRequest); postNoteText.close(); - Thread.sleep(1000); + Thread.sleep(3000); CloseableHttpResponse searchNote = httpGet("/notebook/search?q='testTitleSearchOfParagraph'"); Map<String, Object> respSearchResult = gson.fromJson(EntityUtils.toString(searchNote.getEntity(), StandardCharsets.UTF_8), diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java index b593673..97f799c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java @@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -28,6 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue; */ public abstract class NoteEventAsyncListener implements NoteEventListener { + private static final Logger LOGGER = LoggerFactory.getLogger(NoteEventAsyncListener.class); + private BlockingQueue<NoteEvent> eventsQueue = new LinkedBlockingQueue<>(); private Thread eventHandlerThread; @@ -38,17 +42,17 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { this.eventHandlerThread.start(); } - public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent); + public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception; - public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent); + public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception; - public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent); + public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception; - public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent); + public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception; - public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent); + public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception; - public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent); + public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception; public void close() { @@ -112,8 +116,8 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } else { throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName()); } - } catch (InterruptedException e) { - e.printStackTrace(); + } catch (Exception e) { + LOGGER.error("Fail to handle NoteEvent", e); } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 39dea80..94614bb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -377,6 +377,10 @@ public class Notebook { public void saveNote(Note note, AuthenticationInfo subject) throws IOException { noteManager.saveNote(note, subject); + } + + public void updateNote(Note note, AuthenticationInfo subject) throws IOException { + noteManager.saveNote(note, subject); fireNoteUpdateEvent(note, subject); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index f0ccce0..98456ce 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -22,12 +22,10 @@ import com.google.common.collect.Lists; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import javax.inject.Inject; @@ -204,17 +202,14 @@ public class LuceneSearch extends SearchService { * @see org.apache.zeppelin.search.Search#updateIndexDoc(org.apache.zeppelin.notebook.Note) */ @Override - public void updateIndexDoc(Note note) throws IOException { + public void updateNoteIndex(Note note) throws IOException { updateIndexNoteName(note); - for (Paragraph p : note.getParagraphs()) { - updateIndexParagraph(note, p); - } } private void updateIndexNoteName(Note note) throws IOException { String noteName = note.getName(); String noteId = note.getId(); - LOGGER.debug("Indexing Notebook {}, '{}'", noteId, noteName); + LOGGER.debug("Update note index: {}, '{}'", noteId, noteName); if (null == noteName || noteName.isEmpty()) { LOGGER.debug("Skipping empty notebook name"); return; @@ -222,12 +217,10 @@ public class LuceneSearch extends SearchService { updateDoc(noteId, noteName, null); } - private void updateIndexParagraph(Note note, Paragraph p) throws IOException { - if (p.getText() == null) { - LOGGER.debug("Skipping empty paragraph"); - return; - } - updateDoc(note.getId(), note.getName(), p); + @Override + public void updateParagraphIndex(Paragraph p) throws IOException { + LOGGER.debug("Update paragraph index: {}", p.getId()); + updateDoc(p.getNote().getId(), p.getNote().getName(), p); } /** @@ -288,7 +281,9 @@ public class LuceneSearch extends SearchService { doc.add(new StringField("title", noteName, Field.Store.YES)); if (null != p) { - doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES)); + if (p.getText() != null) { + doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES)); + } if (p.getTitle() != null) { doc.add(new TextField(SEARCH_FIELD_TITLE, p.getTitle(), Field.Store.YES)); } @@ -301,38 +296,10 @@ public class LuceneSearch extends SearchService { } /* (non-Javadoc) - * @see org.apache.zeppelin.search.Search#addIndexDocs(java.util.Collection) - */ - @Override - public void addIndexDocs(Collection<Note> collection) { - int docsIndexed = 0; - long start = System.nanoTime(); - try { - for (Note note : collection) { - addIndexDocAsync(note); - docsIndexed++; - } - } catch (IOException e) { - LOGGER.error("Failed to index all Notebooks", e); - } finally { - try { // save what's been indexed, even if not full collection - indexWriter.commit(); - } catch (IOException e) { - LOGGER.error("Failed to save index", e); - } - long end = System.nanoTime(); - LOGGER.info( - "Indexing {} notebooks took {}ms", - docsIndexed, - TimeUnit.NANOSECONDS.toMillis(end - start)); - } - } - - /* (non-Javadoc) * @see org.apache.zeppelin.search.Search#addIndexDoc(org.apache.zeppelin.notebook.Note) */ @Override - public void addIndexDoc(Note note) { + public void addNoteIndex(Note note) { try { addIndexDocAsync(note); indexWriter.commit(); @@ -341,6 +308,11 @@ public class LuceneSearch extends SearchService { } } + @Override + public void addParagraphIndex(Paragraph pararaph) throws IOException { + updateDoc(pararaph.getNote().getId(), pararaph.getNote().getName(), pararaph); + } + /** * Indexes the given notebook, but does not commit changes. * @@ -349,12 +321,8 @@ public class LuceneSearch extends SearchService { */ private void addIndexDocAsync(Note note) throws IOException { indexNoteName(indexWriter, note.getId(), note.getName()); - for (Paragraph doc : note.getParagraphs()) { - if (doc.getText() == null) { - LOGGER.debug("Skipping empty paragraph"); - continue; - } - indexDoc(indexWriter, note.getId(), note.getName(), doc); + for (Paragraph paragraph : note.getParagraphs()) { + updateDoc(note.getId(), note.getName(), paragraph); } } @@ -362,8 +330,14 @@ public class LuceneSearch extends SearchService { * @see org.apache.zeppelin.search.Search#deleteIndexDocs(org.apache.zeppelin.notebook.Note) */ @Override - public void deleteIndexDocs(String noteId) { - deleteDoc(noteId, null); + public void deleteNoteIndex(Note note) { + if (note == null) { + return; + } + deleteDoc(note.getId(), null); + for (Paragraph paragraph : note.getParagraphs()) { + deleteParagraphIndex(note.getId(), paragraph); + } } /* (non-Javadoc) @@ -371,10 +345,16 @@ public class LuceneSearch extends SearchService { * #deleteIndexDoc(org.apache.zeppelin.notebook.Note, org.apache.zeppelin.notebook.Paragraph) */ @Override - public void deleteIndexDoc(String noteId, Paragraph p) { + public void deleteParagraphIndex(String noteId, Paragraph p) { deleteDoc(noteId, p); } + /** + * Delete note index of paragraph index (when p is not null). + * + * @param noteId + * @param p + */ private void deleteDoc(String noteId, Paragraph p) { String fullNoteOrJustParagraph = formatDeleteId(noteId, p); LOGGER.debug("Deleting note {}, out of: {}", noteId, indexWriter.numDocs()); @@ -410,15 +390,7 @@ public class LuceneSearch extends SearchService { LOGGER.debug("Skipping empty notebook name"); return; } - indexDoc(w, noteId, noteName, null); - } - - /** Indexes a single document: - code of the paragraph (if non-null) - or just a note name */ - private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p) - throws IOException { - String id = formatId(noteId, p); - Document doc = newDocument(id, noteName, p); - w.addDocument(doc); + updateDoc(noteId, noteName, null); } @Override @@ -426,7 +398,7 @@ public class LuceneSearch extends SearchService { Thread thread = new Thread(() -> { LOGGER.info("Starting rebuild index"); notes.forEach(note -> { - addIndexDoc(note); + addNoteIndex(note); note.unLoad(); }); LOGGER.info("Finish rebuild index"); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java index bb4c189..9ef0217 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.search; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -48,33 +47,42 @@ public abstract class SearchService extends NoteEventAsyncListener { public abstract List<Map<String, String>> query(String queryStr); /** - * Updates all documents in index for the given note: - * - name - * - all paragraphs + * Updates note index for the given note, only update index of note meta info, + * such as id,name. Paragraph index will be done in method updateParagraphIndex. * * @param note a Note to update index for * @throws IOException */ - public abstract void updateIndexDoc(Note note) throws IOException; + public abstract void updateNoteIndex(Note note) throws IOException; /** - * Indexes full collection of notes: all the paragraphs + Note names + * Updates paragraph index for the given paragraph. * - * @param collection of Notes + * @param paragraph a Paragraph to update index for + * @throws IOException */ - public abstract void addIndexDocs(Collection<Note> collection); + + public abstract void updateParagraphIndex(Paragraph paragraph) throws IOException; /** * Indexes the given note. * * @throws IOException If there is a low-level I/O error */ - public abstract void addIndexDoc(Note note); + public abstract void addNoteIndex(Note note) throws IOException; + + /** + * Indexes the given paragraph. + * + * @throws IOException If there is a low-level I/O error + */ + public abstract void addParagraphIndex(Paragraph pargaraph) throws IOException; + /** * Deletes all docs on given Note from index */ - public abstract void deleteIndexDocs(String noteId); + public abstract void deleteNoteIndex(Note note) throws IOException; /** * Deletes doc for a given @@ -83,7 +91,7 @@ public abstract class SearchService extends NoteEventAsyncListener { * @param p * @throws IOException */ - public abstract void deleteIndexDoc(String noteId, Paragraph p); + public abstract void deleteParagraphIndex(String noteId, Paragraph p) throws IOException; /** * Frees the recourses used by index @@ -93,46 +101,34 @@ public abstract class SearchService extends NoteEventAsyncListener { } @Override - public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) { - addIndexDoc(noteCreateEvent.getNote()); + public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception { + addNoteIndex(noteCreateEvent.getNote()); } @Override - public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) { - deleteIndexDocs(noteRemoveEvent.getNote().getId()); + public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception { + deleteNoteIndex(noteRemoveEvent.getNote()); } @Override - public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) { - try { - updateIndexDoc(noteUpdateEvent.getNote()); - } catch (IOException e) { - e.printStackTrace(); - } + public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception { + updateNoteIndex(noteUpdateEvent.getNote()); } @Override - public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) { - try { - updateIndexDoc(paragraphCreateEvent.getParagraph().getNote()); - } catch (IOException e) { - e.printStackTrace(); - } + public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception { + addParagraphIndex(paragraphCreateEvent.getParagraph()); } @Override - public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) { + public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception { Paragraph p = paragraphRemoveEvent.getParagraph(); - deleteIndexDoc(p.getNote().getId(), p); + deleteParagraphIndex(p.getNote().getId(), p); } @Override - public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) { - try { - updateIndexDoc(paragraphUpdateEvent.getParagraph().getNote()); - } catch (IOException e) { - e.printStackTrace(); - } + public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception { + updateParagraphIndex(paragraphUpdateEvent.getParagraph()); } public abstract void startRebuildIndex(Stream<Note> notes); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java index 7857021..dde2ceb 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java @@ -133,7 +133,7 @@ public class LuceneSearchTest { // give Note note1 = newNoteWithParagraph("Notebook1", "test"); // when - noteSearchService.addIndexDoc(note1); + noteSearchService.addNoteIndex(note1); // then String id = resultForQuery("test").get(0).get("id"); // LuceneSearch.ID_FIELD @@ -163,7 +163,8 @@ public class LuceneSearchTest { // when Paragraph p2 = note2.getLastParagraph(); p2.setText("test indeed"); - noteSearchService.updateIndexDoc(note2); + noteSearchService.updateNoteIndex(note2); + noteSearchService.updateParagraphIndex(p2); // then List<Map<String, String>> results = noteSearchService.query("all"); @@ -178,7 +179,7 @@ public class LuceneSearchTest { // give // looks like a bug in web UI: it tries to delete a note twice (after it has just been deleted) // when - noteSearchService.deleteIndexDocs(null); + noteSearchService.deleteNoteIndex(null); } @Test @@ -191,7 +192,7 @@ public class LuceneSearchTest { assertThat(resultForQuery("Notebook2")).isNotEmpty(); // when - noteSearchService.deleteIndexDocs(note2.getId()); + noteSearchService.deleteNoteIndex(note2); // then assertThat(noteSearchService.query("all")).isEmpty(); @@ -215,6 +216,7 @@ public class LuceneSearchTest { Paragraph p1 = note1.getLastParagraph(); p1.setText("no no no"); notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS); + p1.getNote().fireParagraphUpdateEvent(p1); noteSearchService.drainEvents(); // then @@ -240,7 +242,7 @@ public class LuceneSearchTest { // when note1.setName("NotebookN"); - notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS); + notebook.updateNote(note1, AuthenticationInfo.ANONYMOUS); noteSearchService.drainEvents(); Thread.sleep(1000); // then