This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new f65ba50 [ZEPPELIN-5357] Remove lucene writer lock during shutdown f65ba50 is described below commit f65ba502031c6934b941771da322c572b75581a4 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Thu May 6 13:13:29 2021 +0200 [ZEPPELIN-5357] Remove lucene writer lock during shutdown ### What is this PR for? Remove the lucene writer lock during shutdown of the jetty service. ### What type of PR is it? - Bug Fix ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5357 ### How should this be tested? * CI ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4109 from Reamer/lucene_close and squashes the following commits: b1dc7f932 [Philipp Dallig] Rewrite NoteEventAsyncListener to use a ThreadPoolExecutor instead of a simple Thread 7bde85927 [Philipp Dallig] close SearchService --- .../zeppelin/service/NotebookServiceTest.java | 4 +- .../zeppelin/notebook/NoteEventAsyncListener.java | 121 ++++++++++----------- .../org/apache/zeppelin/search/LuceneSearch.java | 13 ++- .../apache/zeppelin/search/NoSearchService.java | 2 +- .../org/apache/zeppelin/search/SearchService.java | 3 + .../apache/zeppelin/search/LuceneSearchTest.java | 55 +++++----- 6 files changed, 103 insertions(+), 95 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index 7168bbd..4700211 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -82,6 +82,7 @@ public class NotebookServiceTest { private static NotebookService notebookService; private File notebookDir; + private SearchService searchService; private ServiceContext context = new ServiceContext(AuthenticationInfo.ANONYMOUS, new HashSet<>()); @@ -116,7 +117,7 @@ public class NotebookServiceTest { when(mockInterpreterSetting.isUserAuthorized(any())).thenReturn(true); when(mockInterpreterGroup.getInterpreterSetting()).thenReturn(mockInterpreterSetting); when(mockInterpreterSetting.getStatus()).thenReturn(InterpreterSetting.Status.READY); - SearchService searchService = new LuceneSearch(zeppelinConfiguration); + searchService = new LuceneSearch(zeppelinConfiguration); Credentials credentials = new Credentials(); NoteManager noteManager = new NoteManager(notebookRepo); AuthorizationService authorizationService = new AuthorizationService(noteManager, zeppelinConfiguration); @@ -147,6 +148,7 @@ public class NotebookServiceTest { @After public void tearDown() { notebookDir.delete(); + searchService.close(); } @Test diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java index 97f799c..d451d9a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java @@ -18,28 +18,32 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.SchedulerThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.BlockingQueue; +import java.io.Closeable; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * An special NoteEventListener which handle events asynchronously */ -public abstract class NoteEventAsyncListener implements NoteEventListener { +public abstract class NoteEventAsyncListener implements NoteEventListener, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(NoteEventAsyncListener.class); - private BlockingQueue<NoteEvent> eventsQueue = new LinkedBlockingQueue<>(); - - private Thread eventHandlerThread; + private final ThreadPoolExecutor executor; + private final String name; - public NoteEventAsyncListener(String name) { - this.eventHandlerThread = new EventHandlingThread(); - this.eventHandlerThread.setName(name); - this.eventHandlerThread.start(); + protected NoteEventAsyncListener(String name) { + this.name = name; + executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name)); } public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception; @@ -55,84 +59,79 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception; + @Override public void close() { - this.eventHandlerThread.interrupt(); + ExecutorUtil.softShutdown(name, executor, 2, TimeUnit.SECONDS); } @Override public void onNoteCreate(Note note, AuthenticationInfo subject) { - eventsQueue.add(new NoteCreateEvent(note, subject)); + executor.execute(new EventHandling(new NoteCreateEvent(note))); } @Override public void onNoteRemove(Note note, AuthenticationInfo subject) { - eventsQueue.add(new NoteRemoveEvent(note, subject)); + executor.execute(new EventHandling(new NoteRemoveEvent(note))); } @Override public void onNoteUpdate(Note note, AuthenticationInfo subject) { - eventsQueue.add(new NoteUpdateEvent(note, subject)); + executor.execute(new EventHandling(new NoteUpdateEvent(note))); } @Override public void onParagraphCreate(Paragraph p) { - eventsQueue.add(new ParagraphCreateEvent(p)); + executor.execute(new EventHandling(new ParagraphCreateEvent(p))); } @Override public void onParagraphRemove(Paragraph p) { - eventsQueue.add(new ParagraphRemoveEvent(p)); + executor.execute(new EventHandling(new ParagraphRemoveEvent(p))); } @Override public void onParagraphUpdate(Paragraph p) { - eventsQueue.add(new ParagraphUpdateEvent(p)); + executor.execute(new EventHandling(new ParagraphUpdateEvent(p))); } @Override public void onParagraphStatusChange(Paragraph p, Job.Status status) { - eventsQueue.add(new ParagraphStatusChangeEvent(p)); + executor.execute(new EventHandling(new ParagraphStatusChangeEvent(p))); } - class EventHandlingThread extends Thread { + class EventHandling implements Runnable { + + private final NoteEvent event; + public EventHandling(NoteEvent event) { + this.event = event; + } @Override public void run() { - while(!Thread.interrupted()) { - try { - NoteEvent event = eventsQueue.take(); - if (event instanceof NoteCreateEvent) { - handleNoteCreateEvent((NoteCreateEvent) event); - } else if (event instanceof NoteRemoveEvent) { - handleNoteRemoveEvent((NoteRemoveEvent) event); - } else if (event instanceof NoteUpdateEvent) { - handleNoteUpdateEvent((NoteUpdateEvent) event); - } else if (event instanceof ParagraphCreateEvent) { - handleParagraphCreateEvent((ParagraphCreateEvent) event); - } else if (event instanceof ParagraphRemoveEvent) { - handleParagraphRemoveEvent((ParagraphRemoveEvent) event); - } else if (event instanceof ParagraphUpdateEvent) { - handleParagraphUpdateEvent((ParagraphUpdateEvent) event); - } else { - throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName()); - } - } catch (Exception e) { - LOGGER.error("Fail to handle NoteEvent", e); + try { + if (event instanceof NoteCreateEvent) { + handleNoteCreateEvent((NoteCreateEvent) event); + } else if (event instanceof NoteRemoveEvent) { + handleNoteRemoveEvent((NoteRemoveEvent) event); + } else if (event instanceof NoteUpdateEvent) { + handleNoteUpdateEvent((NoteUpdateEvent) event); + } else if (event instanceof ParagraphCreateEvent) { + handleParagraphCreateEvent((ParagraphCreateEvent) event); + } else if (event instanceof ParagraphRemoveEvent) { + handleParagraphRemoveEvent((ParagraphRemoveEvent) event); + } else if (event instanceof ParagraphUpdateEvent) { + handleParagraphUpdateEvent((ParagraphUpdateEvent) event); + } else { + throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName()); } + } catch (Exception e) { + LOGGER.error("Fail to handle NoteEvent", e); } } } - /** - * Used for testing - * - * @throws InterruptedException - */ - public void drainEvents() throws InterruptedException { - while(!eventsQueue.isEmpty()) { - Thread.sleep(1000); - } - Thread.sleep(5000); + public boolean isEventQueueEmpty() { + return executor.getQueue().isEmpty(); } interface NoteEvent { @@ -140,12 +139,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class NoteCreateEvent implements NoteEvent { - private Note note; - private AuthenticationInfo subject; + private final Note note; - public NoteCreateEvent(Note note, AuthenticationInfo subject) { + public NoteCreateEvent(Note note) { this.note = note; - this.subject = subject; } public Note getNote() { @@ -154,12 +151,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class NoteUpdateEvent implements NoteEvent { - private Note note; - private AuthenticationInfo subject; + private final Note note; - public NoteUpdateEvent(Note note, AuthenticationInfo subject) { + public NoteUpdateEvent(Note note) { this.note = note; - this.subject = subject; } public Note getNote() { @@ -169,12 +164,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { public static class NoteRemoveEvent implements NoteEvent { - private Note note; - private AuthenticationInfo subject; + private final Note note; - public NoteRemoveEvent(Note note, AuthenticationInfo subject) { + public NoteRemoveEvent(Note note) { this.note = note; - this.subject = subject; } public Note getNote() { @@ -183,7 +176,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class ParagraphCreateEvent implements NoteEvent { - private Paragraph p; + private final Paragraph p; public ParagraphCreateEvent(Paragraph p) { this.p = p; @@ -195,7 +188,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class ParagraphUpdateEvent implements NoteEvent { - private Paragraph p; + private final Paragraph p; public ParagraphUpdateEvent(Paragraph p) { this.p = p; @@ -207,7 +200,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class ParagraphRemoveEvent implements NoteEvent { - private Paragraph p; + private final Paragraph p; public ParagraphRemoveEvent(Paragraph p) { this.p = p; @@ -219,7 +212,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener { } public static class ParagraphStatusChangeEvent implements NoteEvent { - private Paragraph p; + private final Paragraph p; public ParagraphStatusChangeEvent(Paragraph p) { this.p = p; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index 98456ce..3c3645c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.stream.Stream; +import javax.annotation.PreDestroy; import javax.inject.Inject; import org.apache.lucene.analysis.Analyzer; @@ -84,7 +85,7 @@ public class LuceneSearch extends SearchService { @Inject public LuceneSearch(ZeppelinConfiguration conf) { - super("LuceneSearch-Thread"); + super("LuceneSearch"); if (conf.isZeppelinSearchUseDisk()) { try { @@ -309,8 +310,8 @@ public class LuceneSearch extends SearchService { } @Override - public void addParagraphIndex(Paragraph pararaph) throws IOException { - updateDoc(pararaph.getNote().getId(), pararaph.getNote().getName(), pararaph); + public void addParagraphIndex(Paragraph paragraph) throws IOException { + updateDoc(paragraph.getNote().getId(), paragraph.getNote().getName(), paragraph); } /** @@ -364,15 +365,19 @@ public class LuceneSearch extends SearchService { } catch (IOException e) { LOGGER.error("Failed to delete {} from index by '{}'", noteId, fullNoteOrJustParagraph, e); } - LOGGER.debug("Done, index contains {} docs now {}", indexWriter.numDocs()); + LOGGER.debug("Done, index contains {} docs now", indexWriter.numDocs()); } /* (non-Javadoc) * @see org.apache.zeppelin.search.Search#close() */ @Override + @PreDestroy public void close() { + // First interrupt the LuceneSearch-Thread + super.close(); try { + // Second close the indexWriter indexWriter.close(); } catch (IOException e) { LOGGER.error("Failed to .close() the notebook index", e); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java index 058e7c5..24af7f0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java @@ -32,7 +32,7 @@ public class NoSearchService extends SearchService { @Inject public NoSearchService() { - super("NoSearchService-Thread"); + super("NoSearchService"); } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java index 9ef0217..d17beea 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java @@ -25,6 +25,8 @@ import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteEventAsyncListener; import org.apache.zeppelin.notebook.Paragraph; +import javax.annotation.PreDestroy; + /** * Search (both, indexing and query) the notes. * @@ -96,6 +98,7 @@ public abstract class SearchService extends NoteEventAsyncListener { /** * Frees the recourses used by index */ + @PreDestroy public void close() { super.close(); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java index dde2ceb..48d9e84 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java @@ -21,14 +21,14 @@ import static org.apache.zeppelin.search.LuceneSearch.formatId; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.common.base.Splitter; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.List; import java.util.Map; -import com.google.common.io.Files; + import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -54,7 +54,7 @@ public class LuceneSearchTest { @Before public void startUp() throws IOException { - indexDir = Files.createTempDir().getAbsoluteFile(); + indexDir = Files.createTempDirectory("lucene").toFile(); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SEARCH_INDEX_PATH.getVarName(), indexDir.getAbsolutePath()); noteSearchService = new LuceneSearch(ZeppelinConfiguration.create()); interpreterSettingManager = mock(InterpreterSettingManager.class); @@ -62,9 +62,9 @@ public class LuceneSearchTest { when(defaultInterpreterSetting.getName()).thenReturn("test"); when(interpreterSettingManager.getDefaultInterpreterSetting()).thenReturn(defaultInterpreterSetting); notebook = new Notebook(ZeppelinConfiguration.create(), mock(AuthorizationService.class), mock(NotebookRepo.class), mock(NoteManager.class), - mock(InterpreterFactory.class), interpreterSettingManager, - noteSearchService, - mock(Credentials.class), null); + mock(InterpreterFactory.class), interpreterSettingManager, + noteSearchService, + mock(Credentials.class), null); } @After @@ -73,12 +73,19 @@ public class LuceneSearchTest { indexDir.delete(); } -// @Test + private void drainSearchEvents() throws InterruptedException { + while (!noteSearchService.isEventQueueEmpty()) { + Thread.sleep(1000); + } + Thread.sleep(1000); + } + + @Test public void canIndexAndQuery() throws IOException, InterruptedException { // given Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); // when List<Map<String, String>> results = noteSearchService.query("all"); @@ -95,7 +102,7 @@ public class LuceneSearchTest { // given Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); // when List<Map<String, String>> results = noteSearchService.query("Notebook1"); @@ -111,7 +118,7 @@ public class LuceneSearchTest { // given Note note1 = newNoteWithParagraph("Notebook1", "test", "testingTitleSearch"); Note note2 = newNoteWithParagraph("Notebook2", "not test", "notTestingTitleSearch"); - noteSearchService.drainEvents(); + drainSearchEvents(); // when List<Map<String, String>> results = noteSearchService.query("testingTitleSearch"); @@ -128,16 +135,15 @@ public class LuceneSearchTest { assertThat(TitleHits).isAtLeast(1); } - //@Test - public void indexKeyContract() throws IOException { - // give + @Test + public void indexKeyContract() throws IOException, InterruptedException { + // given Note note1 = newNoteWithParagraph("Notebook1", "test"); + drainSearchEvents(); // when - noteSearchService.addNoteIndex(note1); - // then String id = resultForQuery("test").get(0).get("id"); // LuceneSearch.ID_FIELD - - assertThat(Splitter.on("/").split(id)) // key structure <noteId>/paragraph/<paragraphId> + // then + assertThat(id.split("/")).asList() // key structure <noteId>/paragraph/<paragraphId> .containsAllOf( note1.getId(), "paragraph", note1.getLastParagraph().getId()); // LuceneSearch.PARAGRAPH } @@ -158,7 +164,7 @@ public class LuceneSearchTest { // given Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); // when Paragraph p2 = note2.getLastParagraph(); @@ -187,7 +193,7 @@ public class LuceneSearchTest { // given Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); assertThat(resultForQuery("Notebook2")).isNotEmpty(); @@ -208,7 +214,7 @@ public class LuceneSearchTest { // given: total 2 notebooks, 3 paragraphs Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); assertThat(resultForQuery("test").size()).isEqualTo(3); @@ -217,7 +223,7 @@ public class LuceneSearchTest { p1.setText("no no no"); notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS); p1.getNote().fireParagraphUpdateEvent(p1); - noteSearchService.drainEvents(); + drainSearchEvents(); // then assertThat(resultForQuery("Notebook1").size()).isEqualTo(1); @@ -237,13 +243,13 @@ public class LuceneSearchTest { // given: total 2 notebooks, 3 paragraphs Note note1 = newNoteWithParagraph("Notebook1", "test"); Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all"); - noteSearchService.drainEvents(); + drainSearchEvents(); assertThat(resultForQuery("test").size()).isEqualTo(3); // when note1.setName("NotebookN"); notebook.updateNote(note1, AuthenticationInfo.ANONYMOUS); - noteSearchService.drainEvents(); + drainSearchEvents(); Thread.sleep(1000); // then assertThat(resultForQuery("Notebook1")).isEmpty(); @@ -297,7 +303,6 @@ public class LuceneSearchTest { } private Note newNote(String name) throws IOException { - Note note = notebook.createNote(name, AuthenticationInfo.ANONYMOUS); - return note; + return notebook.createNote(name, AuthenticationInfo.ANONYMOUS); } }