This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9bcbef80cc Handle interrupt gracefully for Lucene mutable index (#11558) 9bcbef80cc is described below commit 9bcbef80ccfb9ae4379f3909c012317ea9cf0921 Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Wed Sep 20 12:05:44 2023 -0700 Handle interrupt gracefully for Lucene mutable index (#11558) * handle interrupt for lucene mutable index * address comments, use thread pool * update missed test * init pool prior to state transition * use clearer terminology --- .../RealtimeLuceneDocIdCollector.java | 10 +++- .../invertedindex/RealtimeLuceneTextIndex.java | 57 ++++++++++++++-------- .../RealtimeLuceneTextIndexSearcherPool.java | 56 +++++++++++++++++++++ .../invertedindex/LuceneMutableTextIndexTest.java | 31 ++++++++++++ .../NativeAndLuceneMutableTextIndexTest.java | 2 + .../server/starter/helix/BaseServerStarter.java | 9 ++++ 6 files changed, 145 insertions(+), 20 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java index 14235f1428..2b46061e9e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java @@ -35,11 +35,12 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; * for the time-being. Once we have optimized the realtime, we can */ public class RealtimeLuceneDocIdCollector implements Collector { - + private volatile boolean _shouldCancel; private final MutableRoaringBitmap _docIds; public RealtimeLuceneDocIdCollector(MutableRoaringBitmap docIds) { _docIds = docIds; + _shouldCancel = false; } @Override @@ -60,9 +61,16 @@ public class RealtimeLuceneDocIdCollector implements Collector { @Override public void collect(int doc) throws IOException { + if (_shouldCancel) { + throw new RuntimeException("TEXT_MATCH query was cancelled"); + } // Compute the absolute lucene docID across sub-indexes as doc that is passed is relative to the current reader _docIds.add(context.docBase + doc); } }; } + + public void markShouldCancel() { + _shouldCancel = true; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index f5ddd1f76f..b43d05ec0b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -20,11 +20,12 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.io.File; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.queryparser.classic.QueryParser; -import org.apache.lucene.search.Collector; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherManager; @@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory; */ public class RealtimeLuceneTextIndex implements MutableTextIndex { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RealtimeLuceneTextIndex.class); + private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL = + RealtimeLuceneTextIndexSearcherPool.getInstance(); private final LuceneTextIndexCreator _indexCreator; private SearcherManager _searcherManager; private final StandardAnalyzer _analyzer = new StandardAnalyzer(); @@ -109,27 +112,43 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { @Override public MutableRoaringBitmap getDocIds(String searchQuery) { MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); - Collector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); - IndexSearcher indexSearcher = null; - try { - Query query = new QueryParser(_column, _analyzer).parse(searchQuery); - indexSearcher = _searcherManager.acquire(); - indexSearcher.search(query, docIDCollector); - return getPinotDocIds(indexSearcher, docIDs); - } catch (Exception e) { - LOGGER - .error("Failed while searching the realtime text index for column {}, search query {}, exception {}", _column, - searchQuery, e.getMessage()); - throw new RuntimeException(e); - } finally { + RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); + // A thread interrupt during indexSearcher.search() can break the underlying FSDirectory used by the IndexWriter + // which the SearcherManager is created with. To ensure the index is never corrupted the search is executed + // in a child thread and the interrupt is handled in the current thread by canceling the search gracefully. + // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 + Callable<MutableRoaringBitmap> searchCallable = () -> { + IndexSearcher indexSearcher = null; try { - if (indexSearcher != null) { - _searcherManager.release(indexSearcher); + Query query = new QueryParser(_column, _analyzer).parse(searchQuery); + indexSearcher = _searcherManager.acquire(); + indexSearcher.search(query, docIDCollector); + return getPinotDocIds(indexSearcher, docIDs); + } finally { + try { + if (indexSearcher != null) { + _searcherManager.release(indexSearcher); + } + } catch (Exception e) { + LOGGER.error( + "Failed while releasing the searcher manager for realtime text index for column {}, exception {}", + _column, e.getMessage()); } - } catch (Exception e) { - LOGGER.error("Failed while releasing the searcher manager for realtime text index for column {}, exception {}", - _column, e.getMessage()); } + }; + Future<MutableRoaringBitmap> searchFuture = + SEARCHER_POOL.getExecutorService().submit(searchCallable); + try { + return searchFuture.get(); + } catch (InterruptedException e) { + docIDCollector.markShouldCancel(); + LOGGER.warn("TEXT_MATCH query timeout on realtime consuming segment {}, column {}, search query {}", _segmentName, + _column, searchQuery); + throw new RuntimeException("TEXT_MATCH query timeout on realtime consuming segment"); + } catch (Exception e) { + LOGGER.error("Failed while searching the realtime text index for segment {}, column {}, search query {}," + + " exception {}", _segmentName, _column, searchQuery, e.getMessage()); + throw new RuntimeException(e); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java new file mode 100644 index 0000000000..d1372106e0 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * This class manages a thread pool used for searching over realtime Lucene segments by {@link RealtimeLuceneTextIndex}. + * The pool max size is equivalent to pinot.query.scheduler.query_worker_threads to ensure each worker thread can have + * an accompanying Lucene searcher thread if needed. init() is called in BaseServerStarter to avoid creating a + * dependency on pinot-core. + */ +public class RealtimeLuceneTextIndexSearcherPool { + private static RealtimeLuceneTextIndexSearcherPool _singletonInstance; + private static ExecutorService _executorService; + + private RealtimeLuceneTextIndexSearcherPool(int size) { + _executorService = new ThreadPoolExecutor(0, size, 500, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + } + + public static RealtimeLuceneTextIndexSearcherPool getInstance() { + if (_singletonInstance == null) { + throw new AssertionError("RealtimeLuceneTextIndexSearcherPool.init() must be called first"); + } + return _singletonInstance; + } + + public static RealtimeLuceneTextIndexSearcherPool init(int size) { + _singletonInstance = new RealtimeLuceneTextIndexSearcherPool(size); + return _singletonInstance; + } + + public ExecutorService getExecutorService() { + return _executorService; + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java index d521a981f2..e7046f051a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java @@ -19,9 +19,14 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.io.File; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.apache.lucene.search.SearcherManager; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -32,6 +37,8 @@ import static org.testng.Assert.assertEquals; public class LuceneMutableTextIndexTest { private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "LuceneMutableIndexTest"); private static final String TEXT_COLUMN_NAME = "testColumnName"; + private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL = + RealtimeLuceneTextIndexSearcherPool.init(1); private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; @@ -41,16 +48,30 @@ public class LuceneMutableTextIndexTest { }; } + private String[][] getRepeatedData() { + return new String[][]{ + {"distributed storage", "multi-threading"} + }; + } + @BeforeClass public void setUp() throws Exception { _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", null, null, true, 500); String[][] documents = getTextData(); + String[][] repeatedDocuments = getRepeatedData(); + for (String[] row : documents) { _realtimeLuceneTextIndex.add(row); } + for (int i = 0; i < 1000; i++) { + for (String[] row : repeatedDocuments) { + _realtimeLuceneTextIndex.add(row); + } + } + SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); try { searcherManager.maybeRefresh(); @@ -70,4 +91,14 @@ public class LuceneMutableTextIndexTest { assertEquals(_realtimeLuceneTextIndex.getDocIds("/.*house.*/"), ImmutableRoaringBitmap.bitmapOf(1)); assertEquals(_realtimeLuceneTextIndex.getDocIds("invalid"), ImmutableRoaringBitmap.bitmapOf()); } + + @Test(expectedExceptions = ExecutionException.class, + expectedExceptionsMessageRegExp = ".*TEXT_MATCH query timeout on realtime consuming segment.*") + public void testQueryCancellationIsSuccessful() + throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<MutableRoaringBitmap> res = executor.submit(() -> _realtimeLuceneTextIndex.getDocIds("/.*read.*/")); + executor.shutdownNow(); + res.get(); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java index 31a82bd343..a7c85c6b8d 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java @@ -34,6 +34,8 @@ public class NativeAndLuceneMutableTextIndexTest { private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); private static final String TEXT_COLUMN_NAME = "testColumnName"; private static final String MV_TEXT_COLUMN_NAME = "testMVColumnName"; + private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL = + RealtimeLuceneTextIndexSearcherPool.init(1); private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; private NativeMutableTextIndex _nativeMutableTextIndex; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index dac34e210b..2b771707d2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -66,9 +66,11 @@ import org.apache.pinot.common.version.PinotVersion; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.util.ListenerConfigUtil; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.api.AdminApiApplication; @@ -137,6 +139,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected AdminApiApplication _adminApiApplication; protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker; protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState; + protected RealtimeLuceneTextIndexSearcherPool _realtimeLuceneTextIndexSearcherPool; protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected volatile boolean _isServerReadyToServeQueries = false; @@ -556,6 +559,12 @@ public abstract class BaseServerStarter implements ServiceStartable { + "'", e); } + // Create a thread pool used for mutable lucene index searches, with size based on query_worker_threads config + LOGGER.info("Initializing lucene searcher thread pool"); + int queryWorkerThreads = + _serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); + _realtimeLuceneTextIndexSearcherPool = RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads); + LOGGER.info("Initializing server instance and registering state model factory"); Utils.logVersions(); ControllerLeaderLocator.create(_helixManager); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org