This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bcbef80cc Handle interrupt gracefully for Lucene mutable index 
(#11558)
9bcbef80cc is described below

commit 9bcbef80ccfb9ae4379f3909c012317ea9cf0921
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Wed Sep 20 12:05:44 2023 -0700

    Handle interrupt gracefully for Lucene mutable index (#11558)
    
    * handle interrupt for lucene mutable index
    
    * address comments, use thread pool
    
    * update missed test
    
    * init pool prior to state transition
    
    * use clearer terminology
---
 .../RealtimeLuceneDocIdCollector.java              | 10 +++-
 .../invertedindex/RealtimeLuceneTextIndex.java     | 57 ++++++++++++++--------
 .../RealtimeLuceneTextIndexSearcherPool.java       | 56 +++++++++++++++++++++
 .../invertedindex/LuceneMutableTextIndexTest.java  | 31 ++++++++++++
 .../NativeAndLuceneMutableTextIndexTest.java       |  2 +
 .../server/starter/helix/BaseServerStarter.java    |  9 ++++
 6 files changed, 145 insertions(+), 20 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
index 14235f1428..2b46061e9e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
@@ -35,11 +35,12 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  * for the time-being. Once we have optimized the realtime, we can
  */
 public class RealtimeLuceneDocIdCollector implements Collector {
-
+  private volatile boolean _shouldCancel;
   private final MutableRoaringBitmap _docIds;
 
   public RealtimeLuceneDocIdCollector(MutableRoaringBitmap docIds) {
     _docIds = docIds;
+    _shouldCancel = false;
   }
 
   @Override
@@ -60,9 +61,16 @@ public class RealtimeLuceneDocIdCollector implements 
Collector {
       @Override
       public void collect(int doc)
           throws IOException {
+        if (_shouldCancel) {
+          throw new RuntimeException("TEXT_MATCH query was cancelled");
+        }
         // Compute the absolute lucene docID across sub-indexes as doc that is 
passed is relative to the current reader
         _docIds.add(context.docBase + doc);
       }
     };
   }
+
+  public void markShouldCancel() {
+    _shouldCancel = true;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index f5ddd1f76f..b43d05ec0b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -20,11 +20,12 @@ package 
org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SearcherManager;
@@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory;
  */
 public class RealtimeLuceneTextIndex implements MutableTextIndex {
   private static final org.slf4j.Logger LOGGER = 
LoggerFactory.getLogger(RealtimeLuceneTextIndex.class);
+  private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL =
+      RealtimeLuceneTextIndexSearcherPool.getInstance();
   private final LuceneTextIndexCreator _indexCreator;
   private SearcherManager _searcherManager;
   private final StandardAnalyzer _analyzer = new StandardAnalyzer();
@@ -109,27 +112,43 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
   @Override
   public MutableRoaringBitmap getDocIds(String searchQuery) {
     MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
-    Collector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs);
-    IndexSearcher indexSearcher = null;
-    try {
-      Query query = new QueryParser(_column, _analyzer).parse(searchQuery);
-      indexSearcher = _searcherManager.acquire();
-      indexSearcher.search(query, docIDCollector);
-      return getPinotDocIds(indexSearcher, docIDs);
-    } catch (Exception e) {
-      LOGGER
-          .error("Failed while searching the realtime text index for column 
{}, search query {}, exception {}", _column,
-              searchQuery, e.getMessage());
-      throw new RuntimeException(e);
-    } finally {
+    RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
+    // A thread interrupt during indexSearcher.search() can break the 
underlying FSDirectory used by the IndexWriter
+    // which the SearcherManager is created with. To ensure the index is never 
corrupted the search is executed
+    // in a child thread and the interrupt is handled in the current thread by 
canceling the search gracefully.
+    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
+    Callable<MutableRoaringBitmap> searchCallable = () -> {
+      IndexSearcher indexSearcher = null;
       try {
-        if (indexSearcher != null) {
-          _searcherManager.release(indexSearcher);
+        Query query = new QueryParser(_column, _analyzer).parse(searchQuery);
+        indexSearcher = _searcherManager.acquire();
+        indexSearcher.search(query, docIDCollector);
+        return getPinotDocIds(indexSearcher, docIDs);
+      } finally {
+        try {
+          if (indexSearcher != null) {
+            _searcherManager.release(indexSearcher);
+          }
+        } catch (Exception e) {
+          LOGGER.error(
+              "Failed while releasing the searcher manager for realtime text 
index for column {}, exception {}",
+              _column, e.getMessage());
         }
-      } catch (Exception e) {
-        LOGGER.error("Failed while releasing the searcher manager for realtime 
text index for column {}, exception {}",
-            _column, e.getMessage());
       }
+    };
+    Future<MutableRoaringBitmap> searchFuture =
+        SEARCHER_POOL.getExecutorService().submit(searchCallable);
+    try {
+      return searchFuture.get();
+    } catch (InterruptedException e) {
+      docIDCollector.markShouldCancel();
+      LOGGER.warn("TEXT_MATCH query timeout on realtime consuming segment {}, 
column {}, search query {}", _segmentName,
+          _column, searchQuery);
+      throw new RuntimeException("TEXT_MATCH query timeout on realtime 
consuming segment");
+    } catch (Exception e) {
+      LOGGER.error("Failed while searching the realtime text index for segment 
{}, column {}, search query {},"
+              + " exception {}", _segmentName, _column, searchQuery, 
e.getMessage());
+      throw new RuntimeException(e);
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
new file mode 100644
index 0000000000..d1372106e0
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class manages a thread pool used for searching over realtime Lucene 
segments by {@link RealtimeLuceneTextIndex}.
+ * The pool max size is equivalent to 
pinot.query.scheduler.query_worker_threads to ensure each worker thread can have
+ * an accompanying Lucene searcher thread if needed. init() is called in 
BaseServerStarter to avoid creating a
+ * dependency on pinot-core.
+ */
+public class RealtimeLuceneTextIndexSearcherPool {
+  private static RealtimeLuceneTextIndexSearcherPool _singletonInstance;
+  private static ExecutorService _executorService;
+
+  private RealtimeLuceneTextIndexSearcherPool(int size) {
+    _executorService = new ThreadPoolExecutor(0, size, 500, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+  }
+
+  public static RealtimeLuceneTextIndexSearcherPool getInstance() {
+    if (_singletonInstance == null) {
+      throw new AssertionError("RealtimeLuceneTextIndexSearcherPool.init() 
must be called first");
+    }
+    return _singletonInstance;
+  }
+
+  public static RealtimeLuceneTextIndexSearcherPool init(int size) {
+    _singletonInstance = new RealtimeLuceneTextIndexSearcherPool(size);
+    return _singletonInstance;
+  }
+
+  public ExecutorService getExecutorService() {
+    return _executorService;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index d521a981f2..e7046f051a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -19,9 +19,14 @@
 package org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.io.File;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.search.SearcherManager;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -32,6 +37,8 @@ import static org.testng.Assert.assertEquals;
 public class LuceneMutableTextIndexTest {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"LuceneMutableIndexTest");
   private static final String TEXT_COLUMN_NAME = "testColumnName";
+  private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL =
+      RealtimeLuceneTextIndexSearcherPool.init(1);
 
   private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
 
@@ -41,16 +48,30 @@ public class LuceneMutableTextIndexTest {
     };
   }
 
+  private String[][] getRepeatedData() {
+    return new String[][]{
+        {"distributed storage", "multi-threading"}
+    };
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
     _realtimeLuceneTextIndex =
         new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar", 
null, null, true, 500);
     String[][] documents = getTextData();
+    String[][] repeatedDocuments = getRepeatedData();
+
     for (String[] row : documents) {
       _realtimeLuceneTextIndex.add(row);
     }
 
+    for (int i = 0; i < 1000; i++) {
+      for (String[] row : repeatedDocuments) {
+        _realtimeLuceneTextIndex.add(row);
+      }
+    }
+
     SearcherManager searcherManager = 
_realtimeLuceneTextIndex.getSearcherManager();
     try {
       searcherManager.maybeRefresh();
@@ -70,4 +91,14 @@ public class LuceneMutableTextIndexTest {
     assertEquals(_realtimeLuceneTextIndex.getDocIds("/.*house.*/"), 
ImmutableRoaringBitmap.bitmapOf(1));
     assertEquals(_realtimeLuceneTextIndex.getDocIds("invalid"), 
ImmutableRoaringBitmap.bitmapOf());
   }
+
+  @Test(expectedExceptions = ExecutionException.class,
+      expectedExceptionsMessageRegExp = ".*TEXT_MATCH query timeout on 
realtime consuming segment.*")
+  public void testQueryCancellationIsSuccessful()
+      throws InterruptedException, ExecutionException {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<MutableRoaringBitmap> res = executor.submit(() -> 
_realtimeLuceneTextIndex.getDocIds("/.*read.*/"));
+    executor.shutdownNow();
+    res.get();
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
index 31a82bd343..a7c85c6b8d 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
@@ -34,6 +34,8 @@ public class NativeAndLuceneMutableTextIndexTest {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"RealTimeNativeVsLuceneTest");
   private static final String TEXT_COLUMN_NAME = "testColumnName";
   private static final String MV_TEXT_COLUMN_NAME = "testMVColumnName";
+  private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL =
+      RealtimeLuceneTextIndexSearcherPool.init(1);
 
   private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
   private NativeMutableTextIndex _nativeMutableTextIndex;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index dac34e210b..2b771707d2 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -66,9 +66,11 @@ import org.apache.pinot.common.version.PinotVersion;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.api.AdminApiApplication;
@@ -137,6 +139,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected AdminApiApplication _adminApiApplication;
   protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
   protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;
+  protected RealtimeLuceneTextIndexSearcherPool 
_realtimeLuceneTextIndexSearcherPool;
   protected PinotEnvironmentProvider _pinotEnvironmentProvider;
   protected volatile boolean _isServerReadyToServeQueries = false;
 
@@ -556,6 +559,12 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
               + "'", e);
     }
 
+    // Create a thread pool used for mutable lucene index searches, with size 
based on query_worker_threads config
+    LOGGER.info("Initializing lucene searcher thread pool");
+    int queryWorkerThreads =
+        _serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+    _realtimeLuceneTextIndexSearcherPool = 
RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads);
+
     LOGGER.info("Initializing server instance and registering state model 
factory");
     Utils.logVersions();
     ControllerLeaderLocator.create(_helixManager);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to